0

我创建了一个包含 3 个代理和以下详细信息的 kafka 集群:

  1. 创建了 3 个主题(“initial1”、“initial2”、“final”),每个主题的复制因子 = 3 和分区 = 2。
  2. 创建了 2 个生产者,producer1 推送到“initial1”主题,producer2 推送到“initial2”主题。
  3. 创建了一个 Streams 应用程序 - stream1 - 使用左连接从“initial1”和“initial2”主题读取消息,对其进行处理并写入“final”主题。

以下是流配置:

kafka.sourcetopic.one=initial1
kafka.sourcetopic.two=initial2
kafka.desttopic.one=final
kafka.stream.id=Stream11
kafka.stream.state.dir=kafka-streams-11
kafka.stream.server=PLAINTEXT://x1.x2.x3.x4:9092, PLAINTEXT://x1.x2.x3.x4:9093, PLAINTEXT://x1.x2.x3.x4:9094
kafka.auto-commit=true
kafka.auto-commit-interval=10
kafka.auto-offset-reset=latest
kafka.retries=2
kafka.retries.backoff.ms=10
kafka.request.timeout_ms=900000
kafka.timewindow_ms=36000000
kafka.schema.registry.url=http://x1.x2.x3.x4:8081

场景 1:如果 producer1 和 stream1 都在运行

当 producer1 将消息(比如 1000 条消息)推送到“initial1”主题时,stream1 能够读取它。

场景 2:如果 producer1 和 stream1 停止。经过很长的时间间隔(比如 10 小时),producer1 首先启动并推送 1000 条消息。推送 1000 条消息后,启动 stream1。

stream1 不会读取 producer1 推送的那 1000 条消息。

询问

为什么 stream1 在很长一段时间后重新启动时无法读取过去的消息?如果流在小间隔(比如 5 分钟)内重新启动,则它能够读取生产者 1 推送的过去消息。

4

0 回答 0