问题标签 [mongodb-kafka-connector]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
873 浏览

mongodb - Error: Configuration is not defined: topic

My Kafka Connect is sitting in a Kubernetes cluster which has the mongo-kafka-connect-1.1.0-all.jar in the connectors folder.

I have port forwarded the Kafka Connect service to my localhost and am trying to upload the properties for a Mongo Sink Connector with this curl command (obviously, with my own connection details in the relevant <> places):

However, I get the following error response:

Is my config wrong, am I missing something?

0 投票
2 回答
705 浏览

apache-kafka - Kafka 连接器始终作为单个任务工作

在此处输入图像描述

我正在尝试使用 kafka 将数据从 oracleDB 传输到 mongoDB。所以我像上图一样配置了kafka集群。我知道调整分区和 tasks.max 允许并行处理。但是,当我运行连接器时,它始终作为单个任务运行,不能并行处理。我需要做任何额外的设置吗?

这是我配置的。

  1. 主题创建

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092,127.0.0.2:9092,127.0.0.1:9093 --partitions 3 --topic topicA

  1. 连接器配置

    /li>
0 投票
0 回答
445 浏览

mongodb - Mongo-Kafka 源连接器无法恢复更改流

我正在使用Mongo-Kafka创建源连接器。(版本:mongo-kafka-1.0.1-all.jar)

但我得到了日志:

日志显示它未能恢复更改流。

但是我的mongodb版本是3.6.6

帖子内容如下图所示:

知道如何解决这个问题吗?

0 投票
0 回答
906 浏览

mongodb - Kafka将来自mongodb的流数据连接到elasticsearch

我正在尝试使用 kafka connect 将数据从 mongodb 流式传输到 elasticsearch。

通过 mongodb 连接器流入 kafka 的数据如下所示

将数据保存到elasticsearch时我有以下两个问题

  1. _id 是一个对象,我想在 elasticsearch 中使用“documentId”键作为 _id
  2. 日期是一个带有 $date 键的对象,我不知道如何转换为正常日期。

任何人都可以请我就上述两个问题指出正确的方向。

Mongodb源配置

弹性水槽配置

例外

连接器链接:

0 投票
1 回答
219 浏览

mongodb - 如何将 MessagePack 编码的消息从 Kafka 下沉到 MongoDB

我有一个 Kafka 主题,其中的值是 MessagePack 编码的。

有没有办法使用 MongoDB Kafka 连接器将来自该主题的记录下沉到 MongoDB 中,或者必须将记录值简单地存储为 JSON?

0 投票
1 回答
513 浏览

mongodb - 具有多个任务的分布式官方 Mongodb Kafka 源连接器不起作用

我在我的 Windows 机器上运行 Apache Kafka,有两个 Kafka-Connect-Workers(端口 8083、8084)和一个带有三个分区的主题(一个复制)。我的问题是,每当我关闭其中一个 Kafka-Connect 工作人员时,我都能看到故障转移到其他工作人员,但由于任务数始终为 ONE,因此没有发生负载平衡。我使用官方 MongoDB-Kafka-Connector 作为 Source(ChangeStream),tasks.max=6。我尝试使用多个线程更新 MongoDB,以便它可以将更多数据推送到 Kafka-Connect,并可能使 Kafka-Connect 创建更多任务。即使在数据量更大的情况下,任务计数仍然是一个。

我如何确认只有一项任务正在运行?那是通过 api "http://localhost:8083/connectors/mongodb-connector/status" : 响应: { "name":"mongodb-connector", "connector": { "state":"RUNNING", "worker_id":"xx.xx.xx.xx:8083" } "tasks": [ { "id": 0, "state": "RUNNING" "worker_id": "xx.xx.xx.xx:8083" } ], "type": "source" } 我在这里遗漏了什么吗?为什么没有创建更多任务?

0 投票
0 回答
441 浏览

mongodb - 无法恢复 Kafka MongoDB 源连接器

我正在使用带有融合平台 v5.4.1 和 MongoDB v3.6 ReplicaSet 的 Kafka MongoDB 源连接器 [https://www.confluent.io/hub/mongodb/kafka-connect-mongodb]。Kafka MongoDB 源连接器已被删除,现在当它在一个月后再次重新创建时,当我收到以下错误时。

在搜索了这个错误的原因后,我明白在 Oplog 中找不到恢复令牌,因为 Oplog 是内存/大小上限,它会清除旧信息。我也明白,为了尽量减少这个问题的发生,我应该增加 Oplog 大小等。但我想知道是否有可能从 Kafka/Confluent 平台方面解决这个问题?就像我可以删除 Kafka 主题、KSQL 主题一样,因为我正在使用主题“myDBName.myCollectionNamedata”、与主题关联的数据创建流,或者在 Kafka Connect 中执行一些操作,以便 MongoDB 源连接器开始捕获更改在 MongoDB Collections 中再次从当前时间丢弃旧信息?

0 投票
1 回答
612 浏览

mongodb - Kakfa-MongoDB 源连接器流中缺少恢复令牌

我正在使用 Confluent Platform v5.4、Mongo DB 3.6 版和 Kafka Mongo DB 连接器。我已将 Kafka Mongo DB 源连接器配置为在 Mongo DB 集合中创建新记录时将数据从 Mongo DB 推送到 Kafka 主题。我已将我的 Kafka 代理配置为将数据日志保留 1 天,之前设置为默认配置 7 天。现在我的问题是,我在 Mongo DB 日志中反复出现这样的错误 -

我知道在 Kafka 主题和 Mongo DB 的 Oplog 之间,有一个共享的恢复令牌以保持数据流正常运行。但是有时 Oplog 会被刷掉(因为 Oplog 的大小只能是分配给 Mongo DB 的内存的一定百分比)或者对应于 Kafka 主题的数据被删除,从而导致流的破裂。

我的问题是-如何避免流中断?如何确保恢复令牌始终存在于 Oplog 和 Kafka 主题中?有什么方法可以从可用的地方手动获取恢复令牌并在丢失的地方更新恢复令牌?

0 投票
1 回答
209 浏览

mongodb-kafka-connector - 官方 MongoDB Kafka 源连接器未发布干净的扩展 JSON

我已经设置了一个非常简单的mongo kafka 源连接器来将 mongo 的 oplog 流式传输到 kafka。但是,我看到在连接器发布的消息中,序列化的 oplog 事件不遵守扩展的 JSON 规范;例如,日期时间字段表示为:

当规范说它应该被格式化为:

为什么我没有得到干净的扩展 JSON?

注意:我的连接器配置文件如下所示:

0 投票
2 回答
256 浏览

apache-spark - 是否可以使用 Spark 结构化流中的 foreachBatch 将两个不相交的数据集写入数据同步?

我正在尝试将数据从单一来源写入多个 DataSink(Mongo 和 Postgres DB)。传入数据

问题是,我可以看到 Spark 正在打开两个 Streams 并两次读取相同的事件。是否可以读取一次并应用不同的转换并写入不同的集合?