我正在尝试使用<KStream>.process()
aTimeWindows.of("name", 30000)
来批量处理一些KTable值并将它们发送出去。似乎 30 秒超过了消费者超时间隔,之后 Kafka 认为所述消费者已失效并释放分区。
我尝试提高轮询频率和提交间隔以避免这种情况:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
不幸的是,这些错误仍在发生:
(很多)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
其次是这些:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
显然,我需要更频繁地将心跳发送回服务器。如何?
我的拓扑是:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
KTable每 30 秒按键对值进行分组。在Processor.init()我调用context.schedule(30000)
.
DBProcessorSupplier提供了一个DBProcessor的实例。这是AbstractProcessor的一个实现,其中已经提供了所有覆盖。他们所做的只是记录,所以我知道每个人什么时候被击中。
这是一个非常简单的拓扑,但很明显我在某处遗漏了一步。
编辑:
我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案。我喜欢当客户端退出/死亡时很快就可以使用分区的概念。
编辑:
为了简化问题,我从图中删除了聚合步骤。现在只是消费者->处理器()。(如果我将消费者直接发送给.print()
它会很快工作,所以我知道没关系)。(同样,如果我通过它输出聚合(KTable).print()
似乎也可以)。
我发现应该每 30 秒调用一次的.process()
-实际上阻塞了可变时间长度并且输出有点随机(如果有的话)。.punctuate()
更远:
我将调试级别设置为“调试”并重新运行。我看到很多消息:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>
但是.punctuate()
函数中的断点没有被命中。所以它做了很多工作,但没有给我使用它的机会。