我正在尝试使用 Kafka Streams 0.10.2.1 运行从一个主题到另一个主题的基本管道
KStream<ByteBuffer, ByteBuffer> stream = builder
.stream("transactions_load");
stream.to("transactions_fact");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
如果我查看目标主题,我可以看到那里产生了记录。生成记录大约 1 分钟,然后该过程失败并出现以下错误:
ERROR task [0_19] Error sending record to topic transactions_fact. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl:102)
[2017-10-02 16:30:54,516]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
ERROR task [0_9] Error sending record to topic transactions_fact. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl:102)
[2017-10-02 16:30:54,519]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
....
[2017-10-02 16:30:54,650]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-14: 30068 ms has passed since last append
ERROR task [0_2] Error sending record to topic transactions_fact. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl:102)
[2017-10-02 16:30:54,650]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-14: 30061 ms has passed since last append
ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_0 state: (org.apache.kafka.streams.processor.internals.StreamThread:813)
[2017-10-02 16:31:02,355]org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
...
ERROR stream-thread [StreamThread-1] Failed while executing StreamTask 0_19 due to flush state: (org.apache.kafka.streams.processor.internals.StreamThread:503)
[2017-10-02 16:31:02,378]org.apache.kafka.streams.errors.StreamsException: task [0_19] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:422)
at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:555)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnTasks(StreamThread.java:501)
at org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:449)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:391)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:372)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
更多信息:
- 我正在运行一个流应用程序实例(在我的笔记本电脑上)
- 我每秒将大约 400 条记录写入源主题。
- 源主题有 20 个分区
- 目标主题有 20 个分区
该错误表明问题出在目标主题上?进一步调试的下一步是什么?