10

I want to batch messages with KStream interface.

I have a Stream with Keys/values I tried to collect them in a tumbling window and then I wanted to process the complete window at once.

builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
                .aggregateByKey(
                        HashMap::new,
                        (aggKey, value, aggregate) -> {
                            aggregate.put(value.getUuid, value);
                            return aggregate;
                        },
                        TimeWindows.of("intentWindow", 100),
                        longSerde, mapSerde)
                .foreach((wk, values) -> {

The thing is foreach gets called on each update to the KTable. I would like to process the whole window once it is complete. As in collect Data from 100 ms and then process at once. In for each.

16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6

at some point the new window starts with 1 entry in the map. So I don't even know when the window is full.

any hints to to batch process in kafka streams

4

3 回答 3

5

我的实际任务是将更新从流推送到 redis,但我不想单独读取/更新/写入,即使 redis 很快。我现在的解决方案是使用 KStream.process() 提供一个处理器,该处理器添加到进程队列并实际处理队列中的标点。

public class BatchedProcessor extends AbstractProcessor{

...
BatchedProcessor(Writer writer, long schedulePeriodic)

@Override
public void init(ProcessorContext context) {
    super.init(context);
    context.schedule(schedulePeriodic);
}

@Override
public void punctuate(long timestamp) {
    super.punctuate(timestamp);
    writer.processQueue();
    context().commit();
}

@Override
public void process(Long aLong, IntentUpdateEvent intentUpdateEvent) {
    writer.addToQueue(intentUpdateEvent);
}

我仍然需要测试,但它解决了我遇到的问题。人们可以很容易地以一种非常通用的方式编写这样的处理器。API 非常整洁干净,但是一个 processBatched((List batchedMessaages)-> ..., timeInterval OR countInterval) 只使用标点符号来处理批处理并在此时提交并在 Store 中收集 KeyValues 可能是一个有用的补充。

但也许它的目的是使用处理器来解决这个问题,并将 API 保持在一条消息中,同时关注低延迟。

于 2016-08-24T12:17:20.713 回答
4

现在(从 Kafka 0.10.0.0 / 0.10.0.1 开始):您描述的窗口行为是“按预期工作”。也就是说,如果您收到 1,000 条传入消息,您将(当前)始终看到 1,000 条更新在下游使用最新版本的 Kafka / Kafka Streams。

展望未来:Kafka 社区正在开发新功能,以使这种更新率行为更加灵活(例如,允许您在上面描述的行为作为您想要的行为)。有关详细信息,请参阅KIP-63:统一流中的存储和下游缓存

于 2016-08-23T18:00:09.173 回答
3

====== 更新 ======

在进一步测试中,这不起作用。正确的方法是使用@friedrich-nietzsche概述的处理器。我对自己的答案投了反对票.... grrrr。

====================

我仍在与这个 API 搏斗(但我喜欢它,所以花时间很值得:)),我不确定你想从你的代码示例结束的下游完成什么,但它看起来与我得到的相似在职的。高水平是:

从源读取的对象。它代表一个键和1:∞的事件数,我想每 5 秒(或 TP5s,每 5 秒的事务)发布每个键的事件总数。代码的开头看起来相同,但我使用:

  1. KStreamBuilder溪流
  2. reduceByKey
  3. 到一个窗口(5000)
  4. 到一个新的流,它每 5 秒获取每个键的累积值。
  5. 将该流映射到每个键的新KeyValue
  6. 水槽主题。

在我的情况下,每个窗口期,我可以将所有事件减少到每个键一个事件,所以这是可行的。如果您想保留每个窗口的所有单个事件,我假设可以使用 reduce 将每个实例映射到实例集合(可能使用相同的键,或者您可能需要一个新键)并在每个窗口期结束时,下游流将一次性获得一堆事件集合(或者可能只是所有事件的一个集合)。它看起来像这样,经过消毒和 Java 7-ish:

    builder.stream(STRING_SERDE, EVENT_SERDE, SOURCE_TOPICS)
        .reduceByKey(eventReducer, TimeWindows.of("EventMeterAccumulator", 5000), STRING_SERDE, EVENT_SERDE)            
        .toStream()
        .map(new KeyValueMapper<Windowed<String>, Event, KeyValue<String,Event>>() {
            public KeyValue<String, Event> apply(final Windowed<String> key, final Event finalEvent) {
                return new KeyValue<String, Event>(key.key(), new Event(key.window().end(), finalEvent.getCount());
            }
    }).to(STRING_SERDE, EVENT_SERDE, SINK_TOPIC);
于 2016-08-23T20:01:49.647 回答