我正在尝试按照以下步骤在我的本地计算机上以独立模式使用 MongoDB 设置 debezium 连接器。
我已经设置了一个 MongoDB 的副本集,其中包含 3 个节点、1 个主节点 2 个副本 (host = localhost, port = 27017, 27018, 27019) 。我已经在本地机器上启动了 kafka 和 zookeper 作为快速入门指南。
在此之后,我从这里下载了 MongoDB 连接器插件 jar 。
将 plugin.path 变量设置为 mongodb 插件 jar 为:
plugin.path=dbz_connector_mongodb_jar_path,KAFKA_HOME/libs
为独立模式创建了连接器配置。
KAFKA_HOME/config/my_mongo_connector.properties:
{
"name": "my-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "rc0/127.0.0.1:27017",
"mongodb.name": "myMongoConnceter",
"collection.include.list": "dbname.collectionName"
}
}
现在,当我运行 kafka 时,请使用以下命令连接:
cd KAFKA_HOME
bin/connect-standalone.sh config/connect-standalone.properties config/my_mongo_connector.properties
我看到以下错误:
java.lang.NoClassDefFoundError: com/mongodb/MongoException
at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137)
at java.base/java.lang.Class.getConstructor0(Class.java:3342)
at java.base/java.lang.Class.newInstance(Class.java:556)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:392)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:362)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:78)
Caused by: java.lang.ClassNotFoundException: com.mongodb.MongoException
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 13 more
然后我将mongo-java-driver-3.12.10.jar
文件添加到具有此类的 kafka connect plugins 文件夹中com.mongodb.MongoException
。但是,我仍然面临同样的错误。
我还发现这个 jar 在运行 kafka connect 命令时正在加载:
INFO Loading plugin from: debezium-connector-mongodb_path/mongo-java-driver-3.12.10.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
INFO Registered loader: PluginClassLoader{pluginLocation=file:debezium-connector-mongodb_path/mongo-java-driver-3.12.10.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
编辑 :
我的 NoClassDefFoundError 问题通过更正 plugin.path 得到解决。
正如评论中所建议的那样,早些时候它是/opt/debezium-connector-mongodb
,我将其更改/opt
为 。
我的另一个错误:我正在使用 json 格式的连接器属性文件,因此将其更改为属性文件格式:
name:my-mongo-connector
connector.class:io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=rc0/127.0.0.1:27017
mongodb.name=myMongoConnector
collection.include.list=dbname.collectionName
topic.creation.default.replication.factor=1
topic.creation.default.partitions=2
mongodb.members.auto.discover=false
现在 kafka 连接已启动,但在 mongo 集合中插入新条目时,未创建 kafka 主题。
所以现在的问题是即使在数据库中插入数据后也无法创建 kafka 主题。