- 我想根据创建消息的时间戳来批处理消息。
- 此外,我想在固定时间窗口(1 分钟)中批量处理这些消息。
- 只有在窗口通过后,才应该将批次推到下游。
为此,处理器 API 似乎或多或少适合(la KStream 批处理窗口):
public void process(String key, SensorData sensorData) {
//epochTime is rounded to our prefered time window (1 minute)
long epochTime = Sensordata.epochTime;
ArrayList<SensorData> data = messageStore.get(epochTime);
if (data == null) {
data = new ArrayList<SensorData>();
}
data.add(sensorData);
messageStore.put(id, data);
this.context.commit();
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(60000); // equal to 1 minute
}
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
while (it.hasNext()) {
KeyValue<String, ArrayList<SensorData>> entry = it.next();
this.context.forward(entry.key, entry.value);
}
//reset messageStore
}
但是,这种方法有一个主要缺点:我们不使用 Kafka Streams 窗口。
- 不考虑乱序消息。
- 实时操作时,标点计划应等于所需的批处理时间窗口。如果我们将其设置为短,批处理将被转发并且下游计算将开始快速。如果设置为长,并且当批处理窗口尚未完成时触发标点符号,同样的问题。
- 此外,在保持标点时间表(1 分钟)的同时重放历史数据将仅在 1 分钟后触发第一次计算。如果是这样,那将炸毁 statestore 并且感觉不对。
考虑到这些点,我应该使用 Kafka Streams 窗口。但这只有在 Kafka Streams DSL 中才有可能......
在这方面的任何困难都会很棒。