0

我在 AWS Kinesis Analytics 服务上有一个 Flink 应用程序。我需要根据阈值过滤数据流上的一些值。此外,我正在使用 AWS Systems Manager Parameter Store 服务传递阈值参数。现在,我得到了这个:

  • 在我的主要课程中:
val threshold: Int = ssmParameter.getParameterRequest(ssmClient, "/kinesis/threshold").toInt

val kinesis_deserialization_schema = new KinesisDeserialization[ID]
            val KinesisConsumer = new FlinkKinesisConsumer[ID](
                "Data-Stream",
                kinesis_deserialization_schema,
                consumerProps
            )
            val KinesisSource = env.addSource(KinesisConsumer).name(s"Kinesis Data")
val valid_data = KinesisSource
          .filter(new MyFilter[ID](threshold))
          .name("FilterData")
          .uid("FilterData")
  • 过滤器类:
import cl.mydata.InputData
import org.apache.flink.api.common.functions.FilterFunction

class MyFilter[ID <: InputData](
                                  threshold: Int
                                ) extends FilterFunction[ID] {
  override def filter(value: ID): Boolean = {
      value.myvalue > threshold
    }
  }
}

这很好用,问题是我需要每小时更新阈值参数,因为我的客户可以更改该值。

4

2 回答 2

0

您可以将FilterFunction转换为BroadcastProcessFunction,并在新阈值可用时广播它们。

于 2022-01-28T13:29:59.917 回答
0

或许可以在MyFilter类中实现ProcessingTimeCallback接口,支持定时器操作,可以在onProcessingTime函数中更新阈值

public class MyFilter extends FilterFunction<...> implements ProcessingTimeCallback { 
    int threshold;

    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);
    }

    @Override
    public boolean filter(IN xxx) throws Exception {
        return xxx > threshold;
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        threshold = XXXX;

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);
    }
}
于 2022-01-28T08:44:31.407 回答