5

我是 Kafka 的新手,我想看看是否可以使用 Kafka 将 MongoDb 数据与另一个系统同步。

我的设置:

  1. 我正在运行 AWS MSK 集群,并且手动创建了一个带有 Kafka 客户端的 EC2 实例。
  2. 我已将 MongoDB Kafka Connect 插件添加到/usr/local/share/kafka/plugins.
  3. 我正在运行 Kafka 连接,可以看到它加载了插件
./bin/connect-standalone.sh ./config/connect-standalone.properties /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties
[2020-10-17 13:57:22,304] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
  1. 解压插件有这个结构
Archive:  mongodb-kafka-connect-mongodb-1.3.0.zip
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSinkConnector.properties  
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/README.md  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/LICENSE.txt  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/manifest.json  
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/mongo-kafka-1.3.0-all.jar  
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-leaf.png  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-logo.png  

这个插件来自融合页面,我也尝试从 Maven 页面下载它。问题是当我运行 Kafka Connect 时它失败了,因为插件缺少 Java 依赖项。

[2020-10-17 13:57:24,898] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/avro/Schema
    at com.mongodb.kafka.connect.source.MongoSourceConfig.createConfigDef(MongoSourceConfig.java:591)
    at com.mongodb.kafka.connect.source.MongoSourceConfig.<clinit>(MongoSourceConfig.java:293)
    at com.mongodb.kafka.connect.MongoSourceConnector.config(MongoSourceConnector.java:91)
    at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
    at com.mongodb.kafka.connect.MongoSourceConnector.validate(MongoSourceConnector.java:51)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:313)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
Caused by: java.lang.NoClassDefFoundError: org/apache/avro/Schema
    ... 8 more
Caused by: java.lang.ClassNotFoundException: org.apache.avro.Schema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 8 more

我的印象是插件应该在 jar 文件/usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/mongo-kafka-1.3.0-all.jar中而不是在 Java SDK 中查找依赖项。

我在这个设置中缺少什么?

4

2 回答 2

5

快速浏览一下应该会告诉您错误是否正确...

jar -tf  mongo-kafka-1.3.0-all.jar | grep avro

如果该 JAR 本身不捆绑 Avro,那么 MSK 很可能不会像 Confluent Platform 那样包含 Avro(我假设 Mongo 主要捆绑了他们的连接器)。至少,Avro 不是 Apache Kafka 的依赖项,因此可以解释该错误。

您将需要下载 Avro JAR 并将其放在您的 Kafka Connect 类路径中(或至少在该 lib 文件夹中)

于 2020-10-20T07:44:22.067 回答
1

在本地运行时,我遇到了同样的问题。我从不再提供 uber jar 的 confluent 平台下载了 jar(mongo-kafka-connect-1.6.0-confluent.jar)。所以我搜索了 uber jar 并找到了下面的站点,我可以从哪里下载 uber jar(在下载下拉列表中选择全部)并解决了这个问题。

https://search.maven.org/search?q=a:mongo-kafka-connect

于 2021-07-25T13:58:42.613 回答