问题标签 [mongodb-kafka-connector]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
949 浏览

apache-kafka - 如何在带有 MongoDB 的 Kafka Connect Sink 连接器中获取 kafka 消息的标头

如何使用 Kafka Connect 从 kafka 消息中检索传入的标头,以将它们存储为使用 MongoDB Sink Connector 到 mongodb 的附加数据字段。

我有一个卡夫卡主题“PROJECT_EXAMPLE_TOPIC”。如您所见,我已经能够保存 msg 时间戳、传入消息数据和 mongo 文档创建/更新日期。

我想有一个函数可以在某处提取标题。

示例卡夫卡值

  1. 如何获得原始标题header_foo
  1. 如何获取所有卡夫卡标题?

有我的配置

0 投票
0 回答
180 浏览

mongodb - Kafka Connect 和 Mongo Source Connector 与管道中的查找聚合阶段

我在 mongoDB 中有以下三个集合:

我还使用以下 mongo 管道在欢乐时光集合上提交了一个 kafka 连接器实例。

当我查看 kafka-connect 日志时,我看到以下消息:

WARN 无法恢复更改流:$changeStream 管道中不允许 $lookup 20

并且消息没有发布到主题。

我真正的用例是将大型 mongodb 文档插入到饮酒者和饮料集合中,并且我想将较小的文档插入到欢乐时光集合中。最终消费者需要饮酒者和饮料信息。

我是否有其他方法可以检索饮酒者和饮料信息并获取发布到该主题的详细消息?

0 投票
2 回答
822 浏览

mongodb - 无法使用 Apache Kafka 使用 MongoDb 插件启动 Kafka Connect

我是 Kafka 的新手,我想看看是否可以使用 Kafka 将 MongoDb 数据与另一个系统同步。

我的设置:

  1. 我正在运行 AWS MSK 集群,并且手动创建了一个带有 Kafka 客户端的 EC2 实例。
  2. 我已将 MongoDB Kafka Connect 插件添加到/usr/local/share/kafka/plugins.
  3. 我正在运行 Kafka 连接,可以看到它加载了插件
  1. 解压插件有这个结构

这个插件来自融合页面,我也尝试从 Maven 页面下载它。问题是当我运行 Kafka Connect 时它失败了,因为插件缺少 Java 依赖项。

我的印象是插件应该在 jar 文件/usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/mongo-kafka-1.3.0-all.jar中而不是在 Java SDK 中查找依赖项。

我在这个设置中缺少什么?

0 投票
1 回答
358 浏览

mongodb - Mongodb Kafka Connector如何查看多个集合

我正在尝试使用 Mongo Kafka 连接器捕获 MongoDb 更改数据。当我将集合名称(即as)放入时它可以工作collection=collection1,但是当我将集合留空并使用以下内容时我无法让它工作pipeline[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]

这是属性文件的样子:

我从运行中收到以下消息bin/connect-standalone.sh

我正在使用 mongodbv3.6

0 投票
0 回答
80 浏览

mongodb - 有一种开箱即用的方法可以将 SimplifiedJson 转换为 ExtendedJson?

我有一个配置了 MongoDB Kafka 连接器 v1.3 output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson,所以我的 UUID 字段输出如下"_id":"MSRJCs07SFy4sMpopdRvEA=="

但是,该 Kafka 消息的一个客户端需要将该 SimplifiedJson 转换为有效的 ExtendedJson,并从该 ExtendedJson 实例化一个 org.bson.BsonDocument(mongodb Java SDK),因此 _id 应该被识别为具有值的 BsonBinary(UUID) 3124490a-cd3b-485c-b8b0-ca68a5d46f10。

直接从 SimplifiedJson 尝试 BsonDocument.parse,该值"_id":"MSRJCs07SFy4sMpopdRvEA=="被视为 BsonString。

客户端无法手动执行此操作,因为该字段没有标准,可能是对象 id、uuid 或任何其他对生成该数据的应用程序有意义的东西。

那么,MongoDB 库中是否有任何内置方法可以将 SimplifiedJson 转换为具有相同原始字段类型(对象 id、二进制、日期、数字等)的 Bson?

0 投票
0 回答
359 浏览

apache-kafka - 使用 MongoSinkConnector 删除文档

我可以在 mongo 中插入/更新文档,但我正在努力删除文档。

这是从 SQL Server 源的 debezium 连接在 kafka 主题中记录数据的方式(最后一行是 DELETE 操作的样子):

在此示例中,即使在 NULL 值之后,具有 user_code 1003 的文档仍在 MongoDB 中。

下面是我配置 MongoSinkConnector 的方式(我已经尝试了这两种方法mongodb.delete.on.null.valuesdelete.on.null.values但都没有奏效):

我也尝试过使用 PartialValueStrategy 但没有运气。

PS:我正在为我的接收器连接器使用 confluentinc/cp-kafka-connect docker 映像。

0 投票
2 回答
258 浏览

mongodb - 带有 kafka 和 mongoDB 连接器的发件箱模式

我想使用Mongo-Kafka 连接器在我们的微服务中实现发件箱模式,在我的发件箱(一个 MongoDB 集合)中,我使用以下字段存储主题数据: , , , , 。kafka_topickafka_keykafka_valuekafka_header_keyskafka_header_values

但是我应该如何配置 mongo-kafka 连接器以从字段中动态选择主题kafka_topic以及来自其他发件箱字段的值和标题?我无法从其配置参考中找到执行此操作的设置。

我还在考虑从 mongo-kafka 连接器存储库中获取一个分支,并使用我的自定义实现对其进行扩展。

如果您能帮助我,我将不胜感激。

0 投票
1 回答
424 浏览

mongodb - 在 MongoDB Kafka 源连接器中配置管道

我正在使用以下源连接器配置来过滤和读取来自 MongoDB 的状态为“PENDING”的特定记录。只需要轮询所有插入/更新为 PENDING 状态的记录。如果排除管道,源连接器能够轮询所有记录。有人可以帮我理解如何解决这个问题,还有没有办法知道轮询已经完成,就像批处理作业完成一样,以协调 kafka 消费者的另一个进程?

0 投票
1 回答
494 浏览

mongodb - 主题未看到 Mongodb Kafka 消息

我遇到了我的主题,尽管运行和操作没有注册我的 MongoDB 中发生的事件。

每次我插入/修改记录时,我都不再从kafka-console-consumer命令中获取日志。

有没有办法清除卡夫卡的缓存/偏移量?源和接收器连接已启动并正在运行。整个集群也很健康,事情是一切都照常工作,但每隔几周我就会看到这种情况再次出现,或者当我从其他位置登录到我的 Mongo 云时。

--partition 0参数没有帮助,也更改retention_ms1

在此处输入图像描述

在此处输入图像描述

我检查了两个连接器的状态并得到RUNNING

curl localhost:8083/connectors | jq 在此处输入图像描述

curl localhost:8083/connectors/monit_people/status | jq 在此处输入图像描述

运行docker-compose logs connect我发现:

0 投票
1 回答
223 浏览

kubernetes - Kafka Connect Mongodb 源连接器的高可用性配置

我一直在寻找有关 Kafka Connect 连接器的高可用性部署的具体信息,但一无所获。

就我而言,我使用Confluent Helm chart部署了一个 Mongodb 源连接器。此图表支持设置副本数。

设置replicaCount为大于 1 的值是否足够或还有其他因素需要考虑 ( tasks.max, ...)?