问题标签 [mongodb-kafka-connector]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
161 浏览

mongodb - mongodb-sink-kafka-connector,BulkWriteError{code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}

当我尝试使用分片 MongoDB 集群来保存来自 kafka 的数据时,它会引发类似的错误

'com.mongodb.MongoBulkWriteException:服务器 192.168.50.117:20000 上的批量写入操作错误。写入错误:[BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}]'

kafka:v2.7,
mongodb:v 4.2,
连接器类型:分布式,

我已尝试重置分片配置并重新启动 kafka 服务器/连接器服务器,但它不起作用
谢谢!

在此处输入图像描述

0 投票
1 回答
172 浏览

mongodb - Kafka MongoDB 从表中只接收一条记录

我正在使用 ksqlDB,我从流中创建了一个表。当我在该表中触发选择查询时,它会正确地为我提供所有记录。现在我想在 MongoDB 中下沉那个表。我还能够在 Kafka 表和 MongoDB 之间创建一个接收器。但不知何故,它只将一条记录沉入其中(MongoDB)。而在表中我有 100 条记录。下面是我的 MongoDB 接收器连接器。

我无法理解,这背后的原因是什么。任何帮助表示赞赏。谢谢

0 投票
1 回答
360 浏览

java - 更新 Kafka sink mongo 上的部分字段

我将 avro 架构与架构一起使用

在 Mongo 中,我的存在数据如下:

当我发送一个 Avro 对象时:Footballer player = Footballer.newBuilder().setId("2").setClub("MU").build();

我的预期结果:

但实际上:

这是我的 kafka 连接器接收器配置:

所以,请帮我正确配置

0 投票
0 回答
27 浏览

mongodb - 从 Mongo 到 Kafka 的数组

我可以使用 MongoExchangeSchema 写入 Kafka 主题数组而不是字符串吗?我想将我的收藏中的所有字段都发送到主题并使用此参数

我再次想使用 KSQL 从数组中提取一些字段。但是我的流没有像数组一样定义字段“联系人”。感谢您的任何想法!

我如何创建流

配置连接

Mongo 集合中的示例

0 投票
1 回答
57 浏览

mongodb - 覆盖 Mongo 源连接器

我将使用 Kafka 连接来使用来自 MongoDB 的消息并发布到 Kafka 主题中。

默认情况下,Mongo 源连接器为每个集合创建一个主题。但是我会有很多收藏,并且希望所有收藏只有一个主题。一条消息将具有集合名称。

  1. 覆盖 mongo-source 连接器是更好的方法吗?如果是这样,我应该记住什么
  2. 是否已经提供了任何设置?我知道collection在创建它时指定为空会监听所有集合。但它会为每个集合创建一个主题。
0 投票
0 回答
211 浏览

mongodb - 在独立模式下为 mongodb 运行 kafka connect 时出现 NoClassDefFoundError

我正在尝试按照以下步骤在我的本地计算机上以独立模式使用 MongoDB 设置 debezium 连接

我已经设置了一个 MongoDB 的副本集,其中包含 3 个节点、1 个主节点 2 个副本 (host = localhost, port = 27017, 27018, 27019) 。我已经在本地机器上启动了 kafka 和 zookeper 作为快速入门指南

在此之后,我从这里下载了 MongoDB 连接器插件 jar 。

将 plugin.path 变量设置为 mongodb 插件 jar 为:

为独立模式创建了连接器配置。

KAFKA_HOME/config/my_mongo_connector.properties:

现在,当我运行 kafka 时,请使用以下命令连接:

我看到以下错误:

然后我将mongo-java-driver-3.12.10.jar文件添加到具有此类的 kafka connect plugins 文件夹中com.mongodb.MongoException。但是,我仍然面临同样的错误。

我还发现这个 jar 在运行 kafka connect 命令时正在加载:

编辑 :

我的 NoClassDefFoundError 问题通过更正 plugin.path 得到解决。

正如评论中所建议的那样,早些时候它是/opt/debezium-connector-mongodb ,我将其更改/opt为 。

我的另一个错误:我正在使用 json 格式的连接器属性文件,因此将其更改为属性文件格式:

现在 kafka 连接已启动,但在 mongo 集合中插入新条目时,未创建 kafka 主题。

所以现在的问题是即使在数据库中插入数据后也无法创建 kafka 主题。

0 投票
0 回答
52 浏览

elasticsearch - 当我在我的 kafka 源连接器配置中使用“output.json.formatter”时,Mongo Elasticsearch Sink 连接器不起作用

我正在使用 Mongodb kafka 源连接器和同步连接器开发 Elasticsearch-mongo 同步连接器。

这是我的源连接器配置

这是我的弹性搜索接收器连接器配置

这工作正常,但在 Elasticsearch 长数据类型字段是这样的: -

但我希望输出看起来像这样:-

所以在我的 kafka 源连接器配置中,我添加了

就像这里提到的

添加后我的源连接器正在工作,我可以在“kafka-console-consumer.bat”中看到预期的输出,但是我的接收器连接器没有工作,它没有向 Elasticsearch 发送数据。

有人可以建议一些解决方案,以便我在 Elasticsearch 中获得 SimplifiesJson(即没有 "$numberLong", "$numberInteger" )。

编辑 这是我为 ElasticsearchSinkConnector 失败而获得的日志

0 投票
1 回答
43 浏览

apache-kafka - MongoSinkConnector:解析秘密引发验证异常

我正在尝试connection.uri通过以下示例解决使用 FileConfigProvider 的问题: https ://docs.confluent.io/platform/current/connect/security.html#externalizing-secrets

我有以下 POST 请求:

我收到以下错误:

似乎在解析秘密值之前执行了配置验证。并且,由于这个原因,该值"connection.uri": "${my-secret}"不是有效的 mongodb 连接字符串。

有没有可能解决这个问题?

源代码:

我使用docker-compose up.

MongoSinkConnector使用 kafka-connect REST 端点进行配置。

在此处输入图像描述

0 投票
1 回答
314 浏览

mongodb - Kafka Connect - MongoDB 源连接器 - 管道不工作

我正在使用 MongoDB 源连接器设置 Kafka 连接器。

配置如下所示:

如果我删除“管道”行,源连接器工作正常,但显然所有文档都会被推送到主题,这不是我想要的。

如果我添加回“管道”行,源连接器不会将任何消息推送到我的主题,我不明白为什么。我错过了什么?这是我们 mongo 中的文档的样子:

配置正在通过 rest api 推送,这就是为什么它具有带有所有转义字符 (\") 的“字典”外观。

谢谢。

0 投票
0 回答
20 浏览

changestream - copy.existing 选项使用 mongo 源连接器提取的最大过去数据

我打算使用 KAFKA-CONNECT 在两个系统之间同步数据。MongoDB 作为源,copy.existing 作为连接器配置之一来同步过去的数据。

我知道更改流可以使用此配置提取过去的数据。我们有大约 34GB 的数据,我们有去年的数据。更改流可以从头开始提取数据吗?更改流的旧数据有多长时间?