3
  • 我想根据创建消息的时间戳来批处理消息。
  • 此外,我想在固定时间窗口(1 分钟)中批量处理这些消息。
  • 只有在窗口通过后,才应该将批次推到下游。

为此,处理器 API 似乎或多或少适合(la KStream 批处理窗口):

public void process(String key, SensorData sensorData) {
    //epochTime is rounded to our prefered time window (1 minute)
    long epochTime = Sensordata.epochTime;

    ArrayList<SensorData> data = messageStore.get(epochTime);
    if (data == null) {
        data = new ArrayList<SensorData>();
    }

    data.add(sensorData);
    messageStore.put(id, data);
    this.context.commit();
}

@Override
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(60000); // equal to 1 minute
}

@Override
public void punctuate(long streamTime) {
    KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
    while (it.hasNext()) {
        KeyValue<String, ArrayList<SensorData>> entry = it.next();
        this.context.forward(entry.key, entry.value);
    }
    //reset messageStore
}

但是,这种方法有一个主要缺点:我们不使用 Kafka Streams 窗口。

  • 不考虑乱序消息。
  • 实时操作时,标点计划应等于所需的批处理时间窗口。如果我们将其设置为短,批处理将被转发并且下游计算将开始快速。如果设置为长,并且当批处理窗口尚未完成时触发标点符号,同样的问题。
  • 此外,在保持标点时间表(1 分钟)的同时重放历史数据将仅在 1 分钟后触发第一次计算。如果是这样,那将炸毁 statestore 并且感觉不对。

考虑到这些点,我应该使用 Kafka Streams 窗口。但这只有在 Kafka Streams DSL 中才有可能......

在这方面的任何困难都会很棒。

4

1 回答 1

2

process()您可以使用,transform()或在 DSL中混合和匹配 DSL 和处理器 API transformValues()(已经有一些其他关于此的 SO 问题,因此我不再详细说明)。因此,您可以将常规窗口构造与自定义(下游)运算符结合使用来保留结果(并删除重复数据)。一些重复已经在您的窗口操作符中自动发生(从 Kafka 开始0.10.1;请参阅http://docs.confluent.io/current/streams/developer-guide.html#memory-management),但如果您想得到一个结果缓存不会为你做这件事。

关于标点:它是基于进度(即流时间)而不是基于挂钟时间触发的 - 因此,如果您重新处理旧数据, if 将被调用与原始运行完全相同的次数(只是如果您考虑挂钟时间,因为您处理旧数据的速度更快)。如果您想了解更多详细信息,还有一些关于此的 SO 问题。

但是,我一般考虑:为什么您只需要一个结果?如果您进行流处理,您可能希望构建下游消费者应用程序以能够处理对结果的更新。这是 Kafka 的固有设计:使用变更日志。

于 2016-12-15T17:45:10.447 回答