问题标签 [mongodb-kafka-connector]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1284 浏览

mongodb - MongoDB Kafka 连接器未生成带有 Mongo 文档 ID 的消息密钥

我正在使用MongoDB Kafka 连接器的 beta 版本从 MongoDB 发布到 Kafka 主题。

消息在 Kafka 中生成,但当它应该是文档 ID 时,它们的键为空:

在此处输入图像描述

这是我的连接独立配置

mongodb 源属性

下面是一个消息字符串值的示例:

我尝试使用转换从值中提取 if,特别是从 documentKey 字段中提取:

但有一个例外:

有什么想法可以生成带有文档 ID 的密钥吗?

0 投票
0 回答
179 浏览

mongodb - 如何修复来自设置为源的 kafka-mongo-connector 的消息 ID

我正在尝试将 kafka-mongo-connector 添加到 kafka-connect 作为事件的来源。我正在设置这样的连接器:

数据进来了,但关键是奇怪的格式:{"_id": {"_data": "....:}}.

因此,我无法插入 Elasticsearch 接收器连接器。

有人有解决方案吗?

谢谢,

弗朗切斯科

0 投票
1 回答
625 浏览

mongodb - 使用 mongo-kafka 作为接收器连接器,如何将主题记录的值字段映射到另一个值?

我是 Kafka Connect 和 MongoDB 的新手。我在 Kafka 主题中有一条记录,其值为 ,{ "Id": "foo" }我希望将其作为文档存储在 mongo 的集合中时Id映射到。BAR预期结果为{ "BAR": "foo" }。我应该尝试什么,或者如何配置来做到这一点?

我用这个作为参考:https ://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md

我尝试在此处添加 "field.renamer.mapping": "[{\"oldName\":\"Id\", \"newName\": \"BAR\"}]""field.renamer.mapping": "[{\"oldName\":\"value.Id\", \"newName\": \"BAR\"}]"类似于 mongo sink 创建的配置:https ://github.com/mongodb/mongo-kafka/blob/11bac7636f0d6b0e3313c84445777253d36c2042/docker/run.sh#L108 。请求顺利通过,在 mongodb 中创建记录,但未按预期映射。

0 投票
1 回答
774 浏览

mongodb - 使用 mongo-kafka 作为接收器连接器,如何将字段的值设置为 Date 类型?

我有一个 mongo sink 连接器以及一个模式注册表。

我将 mongo sink 连接器配置为访问类似于:https ://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md#configuration-example-for-avro 的模式注册表

我按照以下方式创建了一个模式:https ://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md#logical-types 。它看起来像这样:

但是,当记录通过时,数据看起来像这样:{ "myTimestampMillisField": 1572035138104 },而不是类似于 this 的东西{ "myTimestampMillisField": ISODate("2019-10-25T20:28:19.628Z") }

我检查了模式注册表以确保逻辑类型在那里并且看起来不错。

我不确定我做错了什么,或者是否有更好的方法在 mongo 中设置为 Date 类型。有任何想法吗?

0 投票
2 回答
1821 浏览

apache-kafka - 我们可以在 mongodb 中更新/更新记录吗?数据源是kafka

我们可以在 mongodb 中更新/更新记录,但是是否有任何方法或函数可以直接在 mongodb 中更新或更新文档,源系统是 kafka,目标是 mongodb。

0 投票
1 回答
429 浏览

mongodb - 将 cdc 限制为特定集合的 Debezium mongodb 连接器属性

我们可以限制我们在 debezium mongodb连接器配置中的连接器属性中检索的数据量吗?由于 debezium 在数据库中查找 cdc,根据我的理解,它适用于整个数据库,我找不到限制少数特定集合的方法。我在我的 debezium 连接器中使用 mongodb atlas 连接链接。

以下是我的连接器配置:

如何将其配置为仅查找我的数据库中的特定集合?

0 投票
1 回答
517 浏览

mongodb - 来自 Kafka 的 MongoSource 连接创建奇怪的 _data 键

我正在使用具有以下配置的 KafkaConnect - MongoSource:

出于某种原因,当我使用消息时,我得到了一个奇怪的键:

我不知道它是从哪里来的,我的 mongo 文档的 _id 是 UUID,当我使用消息时,我应该在我的消费者密钥上看到 documentKey 字段。

以下是连接器发布到 kafka 的消息示例:

0 投票
2 回答
703 浏览

mongodb - 如何告诉 MongoSource(使用 Kafka Connect)序列化的密钥

我正在使用 mongo 源来监听 mongo 更改流并将所有事件放入 kafka,但我正在绞尽脑汁地寻找一种从事件中提取“Real”键的方法。我尝试了转换,但它没有用,给了我错误:

在 Mongo 源代码中,我发现了这一

这基本上意味着它甚至没有一些密钥处理,而是寻找“_id”字段(这不是文档的id,它是一个恢复令牌信息)

相反,我想将主题的键设置为“documentKey”。

以下是连接器获取的事件示例:

我使用了以下配置:

我试过了:

还有 StringConverter (虽然我不认为这可以用字符串来完成)

有没有办法提取密钥?请注意:模式已禁用。

0 投票
1 回答
325 浏览

mongodb - 通过kafka mongo sink连接器在mongo中的内联json数组对象中附加字段值

发布第一个插入:

{"Customer_id": 2, "transaction_id": "1", "idd": [999, 1111], "id": 1}

然后是第二个:

{"Customer_id": 2, "transaction_id": "2", "idd": [9, 10], "id": 1}

要求的结果:

{"Customer_id": 2, "transaction_id": "2", "idd": [[9, 10] , [999, 1111]], "id": 1}

我得到了什么:

{"Customer_id": 2, "transaction_id": "2", "idd": [9, 10] , "id": 1}

更新策略此更新数组不追加,但我的预期结果追加数组

配置 :

我们如何通过 mongo-kafka sink 连接器做到这一点

0 投票
1 回答
2703 浏览

apache-kafka - 在 Kafka-Connect 中自动重新连接失败的任务

我正在使用带有 Kafka-connect 的 mongo-source 插件。我检查了任务状态,它正在运行并监听一个 mongo 集合。

我手动停止了 mongod 服务并等待了大约 1 分钟,然后我重新启动它。

我检查了源任务以查看是否有任何东西可以自行修复,30 分钟后似乎没有任何效果。

只有在重新启动连接器后它才重新开始工作。

由于 mongo-source 没有设置超时时重试 + 回退的选项,我搜索了一个适合简单场景的配置:使用 Kafka-connect 配置在 X 时间后重新启动失败的任务。找不到任何.. :/ 我可以用一个简单的脚本来做到这一点,但是 Kafka-connect 中必须有一些东西可以管理失败的任务。甚至在 mongo-source 中......我不希望它在 1 分钟后失败得这么快......:/