0

我正在尝试使用 Kafka Streams 0.10.2.1 运行从一个主题到另一个主题的基本管道

    KStream<ByteBuffer, ByteBuffer> stream = builder
            .stream("transactions_load");

    stream.to("transactions_fact");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

如果我查看目标主题,我可以看到那里产生了记录。生成记录大约 1 分钟,然后该过程失败并出现以下错误:

ERROR task [0_19] Error sending record to topic transactions_fact. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl:102) 
 [2017-10-02 16:30:54,516]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
ERROR task [0_9] Error sending record to topic transactions_fact. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl:102) 
 [2017-10-02 16:30:54,519]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
 ....
 [2017-10-02 16:30:54,650]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-14: 30068 ms has passed since last append
ERROR task [0_2] Error sending record to topic transactions_fact. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl:102) 
 [2017-10-02 16:30:54,650]org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-14: 30061 ms has passed since last append
ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_0 state:  (org.apache.kafka.streams.processor.internals.StreamThread:813) 
 [2017-10-02 16:31:02,355]org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
    at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
...
ERROR stream-thread [StreamThread-1] Failed while executing StreamTask 0_19 due to flush state:  (org.apache.kafka.streams.processor.internals.StreamThread:503) 
 [2017-10-02 16:31:02,378]org.apache.kafka.streams.errors.StreamsException: task [0_19] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:422)
    at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:555)
    at org.apache.kafka.streams.processor.internals.StreamThread.performOnTasks(StreamThread.java:501)
    at org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:449)
    at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:391)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:372)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
    at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 24 record(s) for transactions_fact-5: 30012 ms has passed since last append

更多信息:

  • 我正在运行一个流应用程序实例(在我的笔记本电脑上)
  • 我每秒将大约 400 条记录写入源主题。
  • 源主题有 20 个分区
  • 目标主题有 20 个分区

该错误表明问题出在目标主题上?进一步调试的下一步是什么?

4

0 回答 0