1

我正在使用 Confluent Platform v5.4、Mongo DB 3.6 版和 Kafka Mongo DB 连接器。我已将 Kafka Mongo DB 源连接器配置为在 Mongo DB 集合中创建新记录时将数据从 Mongo DB 推送到 Kafka 主题。我已将我的 Kafka 代理配置为将数据日志保留 1 天,之前设置为默认配置 7 天。现在我的问题是,我在 Mongo DB 日志中反复出现这样的错误 -

2020-09-08T14:58:34.500+0000 I COMMAND  [conn66] command local.oplog.rs command: aggregate { aggregate: "topic_1", pipeline: [ { $changeStream: { fullDocument: "default", resumeAfter: { _data: BinData(0, 825F50DCCE00000008463C5F6964003C35363962633732642D386430352D343034622D616262362D64363632656136663464303000005A1004B31FA99AA5C141CB9EE9AA845877B80D04) } } } ], cursor: {}, $db: "ctc", $clusterTime: { clusterTime: Timestamp(1599577091, 6), signature: { hash: BinData(0, 6550821FD90928ABC3A2DFE066FC34E824A83762), keyId: 6847473605622628353 } }, lsid: { id: UUID("2a2e5878-4743-4e83-a4fd-eed68b5efe02") }, $readPreference: { mode: "primaryPreferred" } } planSummary: COLLSCAN exception: resume of change stream was not possible, as the resume token was not found. {_data: BinData(0, "825F579ACE00000008463C5F6964003C31313632333863352D316561612D343961652D613437632D30366263336131333436313900005A1004B31FA99AA5C141CB9EE9AA845877B80D04")} code:ChangeStreamFatalError numYields:8932 reslen:455 locks:{ Global: { acquireCount: { r: 17872 }, acquireWaitCount: { r: 97 }, timeAcquiringMicros: { r: 350686 } }, Database: { acquireCount: { r: 8936 } }, Collection: { acquireCount: { r: 1 } }, oplog: { acquireCount: { r: 8935 } } } protocol:op_msg 22198ms

我知道在 Kafka 主题和 Mongo DB 的 Oplog 之间,有一个共享的恢复令牌以保持数据流正常运行。但是有时 Oplog 会被刷掉(因为 Oplog 的大小只能是分配给 Mongo DB 的内存的一定百分比)或者对应于 Kafka 主题的数据被删除,从而导致流的破裂。

我的问题是-如何避免流中断?如何确保恢复令牌始终存在于 Oplog 和 Kafka 主题中?有什么方法可以从可用的地方手动获取恢复令牌并在丢失的地方更新恢复令牌?

4

1 回答 1

1

在 4.4 兼容的驱动程序中进行了更改,以处理长时间没有任何更改时从 oplog 末尾掉落的更改流恢复令牌。这里描述了 Ruby 驱动程序的用户端;根据需要适应 Java。关键是您需要使用tryNext并从更改流中读取恢复令牌,而不是从返回的更改文档中读取。

如果您正在执行上述所有操作,则需要:

  • 增加oplog窗口,保留更多的oplog,或者
  • 更快地处理更改以保持在 oplog 窗口内。
于 2020-09-09T02:03:37.423 回答