0

我正在尝试按照以下步骤在我的本地计算机上以独立模式使用 MongoDB 设置 debezium 连接

我已经设置了一个 MongoDB 的副本集,其中包含 3 个节点、1 个主节点 2 个副本 (host = localhost, port = 27017, 27018, 27019) 。我已经在本地机器上启动了 kafka 和 zookeper 作为快速入门指南

在此之后,我从这里下载了 MongoDB 连接器插件 jar 。

将 plugin.path 变量设置为 mongodb 插件 jar 为:

plugin.path=dbz_connector_mongodb_jar_path,KAFKA_HOME/libs

为独立模式创建了连接器配置。

KAFKA_HOME/config/my_mongo_connector.properties:

{
  "name": "my-mongo-connector", 
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "rc0/127.0.0.1:27017", 
    "mongodb.name": "myMongoConnceter", 
    "collection.include.list": "dbname.collectionName" 
  }
}

现在,当我运行 kafka 时,请使用以下命令连接:

cd KAFKA_HOME    
bin/connect-standalone.sh config/connect-standalone.properties config/my_mongo_connector.properties

我看到以下错误:

java.lang.NoClassDefFoundError: com/mongodb/MongoException
    at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137)
    at java.base/java.lang.Class.getConstructor0(Class.java:3342)
    at java.base/java.lang.Class.newInstance(Class.java:556)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:392)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:362)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
    at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:78)
Caused by: java.lang.ClassNotFoundException: com.mongodb.MongoException
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 13 more

然后我将mongo-java-driver-3.12.10.jar文件添加到具有此类的 kafka connect plugins 文件夹中com.mongodb.MongoException。但是,我仍然面临同样的错误。

我还发现这个 jar 在运行 kafka connect 命令时正在加载:

INFO Loading plugin from: debezium-connector-mongodb_path/mongo-java-driver-3.12.10.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
INFO Registered loader: PluginClassLoader{pluginLocation=file:debezium-connector-mongodb_path/mongo-java-driver-3.12.10.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)

编辑 :

我的 NoClassDefFoundError 问题通过更正 plugin.path 得到解决。

正如评论中所建议的那样,早些时候它是/opt/debezium-connector-mongodb ,我将其更改/opt为 。

我的另一个错误:我正在使用 json 格式的连接器属性文件,因此将其更改为属性文件格式:

name:my-mongo-connector
connector.class:io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=rc0/127.0.0.1:27017
mongodb.name=myMongoConnector
collection.include.list=dbname.collectionName
topic.creation.default.replication.factor=1
topic.creation.default.partitions=2
mongodb.members.auto.discover=false

现在 kafka 连接已启动,但在 mongo 集合中插入新条目时,未创建 kafka 主题。

所以现在的问题是即使在数据库中插入数据后也无法创建 kafka 主题。

4

0 回答 0