1

我已经安装了 confluent_3.3.0 并启动了 zookeper、schema-registry 和 kafka broker。并从以下链接下载了 couchbase 连接器 https://github.com/couchbase/kafka-connect-couchbase

使用以下命令运行接收器连接器

./bin/connect-standalone etc/kafka/connect-standalone.properties /home/nayangiri/couch-connect-test/kafka-connect-couchbase/config/quickstart-couchbase-sink.properties

运行连接器后,我开始使用 kafka-python 库发布 JSON。

问题是,连接器在没有转储所有已发布消息的情况下断开连接并出现以下错误

[2017-11-07 20:12:39,815] WARN This transcoder (JsonBinaryTranscoder) does not support mutation tokens - this method is a stub and needs to be implemented on custom transcoders. (com.couchbase.client.java.transcoder.AbstractTranscoder:150)
[2017-11-07 20:12:44,821] WARN This transcoder (JsonBinaryTranscoder) does not support mutation tokens - this method is a stub and needs to be implemented on custom transcoders. (com.couchbase.client.java.transcoder.AbstractTranscoder:150)
[2017-11-07 20:12:44,821] WARN This transcoder (JsonBinaryTranscoder) does not support mutation tokens - this method is a stub and needs to be implemented on custom transcoders. (com.couchbase.client.java.transcoder.AbstractTranscoder:150)
[2017-11-07 20:12:44,823] ERROR Task test-couchbase-sink-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
com.couchbase.client.java.error.CannotRetryException: maximum number of attempts reached after 5 retries
    at com.couchbase.client.java.util.retry.RetryWithDelayHandler.call(RetryWithDelayHandler.java:101)
    at com.couchbase.client.java.util.retry.RetryWithDelayHandler.call(RetryWithDelayHandler.java:42)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:252)
    at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:323)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:302)
    at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:284)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:135)
    at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:253)
    at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:160)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
    at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
    at rx.internal.operators.OnSubscribeRedo$2$1.onError(OnSubscribeRedo.java:237)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:818)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:579)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:852)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onError(OnSubscribeMap.java:88)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
    at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
    at rx.internal.producers.SingleProducer.request(SingleProducer.java:65)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:390)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler.access$000(AbstractGenericHandler.java:72)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:408)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException
    at com.couchbase.connect.kafka.util.JsonBinaryTranscoder.newDocument(JsonBinaryTranscoder.java:40)
    at com.couchbase.connect.kafka.util.JsonBinaryTranscoder.newDocument(JsonBinaryTranscoder.java:30)
    at com.couchbase.client.java.transcoder.AbstractTranscoder.newDocument(AbstractTranscoder.java:133)
    at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:568)
    at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:560)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    ... 19 more
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
    ... 19 more
[2017-11-07 20:12:44,830] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2017-11-07 20:12:44,830] ERROR Task test-couchbase-sink-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2017-11-07 20:12:44,831] **ERROR Task is being killed and will not recover until manually restarted** (org.apache.kafka.connect.runtime.WorkerTask:149)
[2017-11-07 20:12:44,836] INFO Closed bucket test (com.couchbase.client.core.config.ConfigurationProvider:115)
[2017-11-07 20:12:44,836] INFO Disconnected from Node 10.103.2.76/localhost (com.couchbase.client.core.node.Node:115)

[2017-11-07 20:12:44,839] INFO [null][KeyValueEndpoint]:从 Channel 收到通知为不活动,正在尝试重新连接。(com.couchbase.client.core.endpoint.Endpoint:115)

感谢您的阅读

4

1 回答 1

1

Thanks for raising this issue. This is a regression in version 3.2.0 of the connector. It is being tracked as KAFKAC-83.

The fix is included in version 3.2.1, scheduled for release on November 21, 2017 released on November 8, 2017.

In the meantime you may wish to temporarily downgrade to version 3.1.3, or build the connector from the latest source code.

PSA: The Couchbase forums have a dedicated section for discussion related to the Kafka connector.

于 2017-11-07T19:41:10.500 回答