我们在执行使用 Kafka Connect Elasticsearch 连接器将来自 Kafka 主题的消息发送到 Elasticsearch 的顺序时遇到问题。在主题中,消息的顺序正确且偏移量正确,但如果连续快速创建两条具有相同 ID 的消息,它们会以错误的顺序间歇性地发送到 Elasticsearch。这会导致 Elasticsearch 获得来自倒数第二条消息的数据,而不是来自最后一条消息的数据。如果我们在主题中的两条消息之间添加一两秒的人为延迟,问题就会消失。
这里的文档指出:
通过使用分区级 Kafka 偏移量作为文档版本,并使用
version_mode=external
.
但是,我在任何地方都找不到有关此version_mode
设置的任何文档,以及我们是否需要将其设置在某个地方。
在来自 Kafka Connect 系统的日志文件中,我们可以看到两条消息(对于相同的 ID)以错误的顺序被处理,相隔几毫秒。看起来这些是在不同的线程中处理的,这可能很重要。另请注意,该主题只有一个分区,因此所有消息都在同一个分区中。
下面是日志片段,为清楚起见稍作编辑。Kafka 主题中的消息由 Debezium 填充,我认为这与问题无关,但恰好包含时间戳值。这表明消息的处理顺序错误(尽管它们在 Kafka 主题中的顺序正确,由 Debezium 填充):
[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE SECOND UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER SECOND UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716205205
}
" (org.apache.http.wire)
...
[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE FIRST UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER FIRST UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716204190
}
" (org.apache.http.wire)
有谁知道在将消息发送到 Elasticsearch 时如何强制此连接器维护给定文档 ID 的消息顺序?