Kafka 弹性搜索连接器“confluentinc-kafka-connect-elasticsearch-5.5.0”在本地无法正常工作。
"java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableSet\n\tat io.searchbox.client.AbstractJestClient.<init>(AbstractJestClient.java:38)\n\tat io.searchbox.client.http.JestHttpClient.<init>(JestHttpClient.java:43)\n\tat io.searchbox.client.JestClientFactory.getObject(JestClientFactory.java:51)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:149)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:141)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
我还在同一路径中使用 mssql 连接器和 s3 连接器插件;它们可以工作,但 elasticsearch 插件给出 noclassfound 错误。这是我在worker中的文件夹结构:
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls
confluentinc-kafka-connect-elasticsearch-5.5.0 confluentinc-kafka-connect-s3-5.5.0 debezium-connector-sqlserver kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls -l
total 16
drwxrwxr-x 2 root root 4096 May 25 22:15 confluentinc-kafka-connect-elasticsearch-5.5.0
drwxrwxr-x 5 root root 4096 May 15 02:26 confluentinc-kafka-connect-s3-5.5.0
drwxrwxr-x 2 root root 4096 May 15 02:26 debezium-connector-sqlserver
drwxrwxr-x 4 root root 4096 May 15 02:26 kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls debezium-connector-sqlserver
debezium-api-1.1.1.Final.jar debezium-connector-sqlserver-1.1.1.Final.jar debezium-core-1.1.1.Final.jar mssql-jdbc-7.2.2.jre8.jar
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls confluentinc-kafka-connect-s3-5.5.0
assets etc lib manifest.json
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls confluentinc-kafka-connect-elasticsearch-5.5.0 -l
total 8356
-rw-r--r-- 1 root root 17558 May 25 11:53 common-utils-5.5.0.jar
-rw-r--r-- 1 root root 263965 May 25 11:53 commons-codec-1.9.jar
-rw-r--r-- 1 root root 61829 May 25 11:53 commons-logging-1.2.jar
-rw-r--r-- 1 root root 79845 May 25 19:34 compress-lzf-1.0.3.jar
-rw-r--r-- 1 root root 241622 May 25 11:53 gson-2.8.5.jar
-rw-r--r-- 1 root root 2329410 May 25 19:34 guava-18.0.jar
-rw-r--r-- 1 root root 1140290 May 25 19:34 hppc-0.7.1.jar
-rw-r--r-- 1 root root 179335 May 25 11:53 httpasyncclient-4.1.3.jar
-rw-r--r-- 1 root root 747794 May 25 11:53 httpclient-4.5.3.jar
-rw-r--r-- 1 root root 323824 May 25 11:53 httpcore-4.4.6.jar
-rw-r--r-- 1 root root 356644 May 25 11:53 httpcore-nio-4.4.6.jar
-rw-r--r-- 1 root root 280996 May 25 19:34 jackson-core-2.8.2.jar
-rw-r--r-- 1 root root 22191 May 25 11:53 jest-6.3.1.jar
-rw-r--r-- 1 root root 276130 May 25 11:53 jest-common-6.3.1.jar
-rw-r--r-- 1 root root 621992 May 25 19:34 joda-time-2.8.2.jar
-rw-r--r-- 1 root root 62226 May 25 19:34 jsr166e-1.1.0.jar
-rw-r--r-- 1 root root 83179 May 25 11:53 kafka-connect-elasticsearch-5.5.0.jar
-rw-r--r-- 1 root root 1330394 May 25 19:34 netty-3.10.5.Final.jar
-rw-r--r-- 1 root root 41139 May 25 11:53 slf4j-api-1.7.26.jar
-rw-r--r-- 1 root root 49754 May 25 19:34 t-digest-3.0.jar
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$
我已经阅读了一些消息,其中缺少弹性搜索连接器中的 jar 文件/依赖项,我添加了它们,如您在上面看到的,但没有运气。
这是我的连接器配置:
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: "elastic-files-connector"
labels:
strimzi.io/cluster: mssql-minio-connect-cluster
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
config:
connection.url: "https://escluster-es-http.dev-kik.io:9200"
connection.username: "${file:/opt/kafka/external-configuration/elasticcreds/connector.properties:connection.username}"
connection.password: "${file:/opt/kafka/external-configuration/elasticcreds/connector.properties:connection.password}"
flush.timeout.ms: 10000
max.buffered.events: 20000
batch.size: 2000
topics: filesql1.dbo.Files
tasks.max: '1'
type.name: "_doc"
max.request.size: "536870912"
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
internal.key.converter: org.apache.kafka.connect.json.JsonConverter
internal.value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
schema.compatibility: NONE
errors.tolerance: all
errors.deadletterqueue.topic.name: "dlq_filesql1.dbo.Files"
errors.deadletterqueue.context.headers.enable: "true"
errors.log.enable: "true"
behavior.on.null.values: "ignore"
errors.retry.delay.max.ms: 60000
errors.retry.timeout: 300000
behavior.on.malformed.documents: warn
我将用户名/密码更改为明文;没运气。我尝试了两个 http/https 的弹性搜索连接,没有运气。
这是我的弹性搜索 srv 信息:
devadmin@vdi-mk2-ubn:~/kafka$ kubectl get svc -n elastic-system
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
elastic-webhook-server ClusterIP 10.104.95.105 <none> 443/TCP 21h
escluster-es-default ClusterIP None <none> <none> 8h
escluster-es-http LoadBalancer 10.108.69.136 192.168.215.35 9200:31214/TCP 8h
escluster-es-transport ClusterIP None <none> 9300/TCP 8h
kibana-kb-http LoadBalancer 10.102.81.206 192.168.215.34 5601:31315/TCP 20h
devadmin@vdi-mk2-ubn:~/kafka$
我可以通过两种方式从 Kafka Connect Worker 连接到弹性搜索服务:
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ curl -u "elastic:5NM0Pp25sFzNu578873BWFnN" -k "https://10.108.69.136:9200"
{
"name" : "escluster-es-default-0",
"cluster_name" : "escluster",
"cluster_uuid" : "TP5f4MGcSn6Dt9hZ144tEw",
"version" : {
"number" : "7.7.0",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "81a1e9eda8e6183f5237786246f6dced26a10eaf",
"build_date" : "2020-05-12T02:01:37.602180Z",
"build_snapshot" : false,
"lucene_version" : "8.5.1",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ curl -u "elastic:5NM0Pp25sFzNu578873BWFnN" -k "https://escluster-es-http.dev-kik.io:9200"
{
"name" : "escluster-es-default-0",
"cluster_name" : "escluster",
"cluster_uuid" : "TP5f4MGcSn6Dt9hZ144tEw",
"version" : {
"number" : "7.7.0",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "81a1e9eda8e6183f5237786246f6dced26a10eaf",
"build_date" : "2020-05-12T02:01:37.602180Z",
"build_snapshot" : false,
"lucene_version" : "8.5.1",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$
无论我做什么,异常都不会改变。我不知道还有什么可以尝试的。我的大脑在燃烧,我快要疯了。
我是否遗漏了什么,或者您能否建议你们如何在 Kubernetes 上运行此连接器?
感谢和问候