0

Description

  • I have a pubSubSource connector in Kafka Connect Distributed mode that is simply reading from a PubSub subscription and writing into a Kafka topic. The issue is, even if I am publishing one message to GCP PubSub, I am getting this message written twice in my Kafka topic.

How to reproduce

  • Deploy Kafka and Kafka connect

  • Create a connector with below pubSubSource configurations:

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
      "name": "pubSubSource",
      "config": {
        "connector.class":"com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "tasks.max":"1",
        "cps.subscription":"pubsub-test-sub",
        "kafka.topic":"kafka-sub-topic",
        "cps.project":"test-project123",
        "gcp.credentials.file.path":"/tmp/gcp-creds/account-key.json"
      }
    }'
    
  • Below are the Kafka-connect configurations:

    "plugin.path": "/usr/share/java,/usr/share/confluent-hub-components"
    "key.converter": "org.apache.kafka.connect.json.JsonConverter"
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    "key.converter.schemas.enable": "false"
    "value.converter.schemas.enable": "false"
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
    "config.storage.replication.factor": "1"
    "offset.storage.replication.factor": "1"
    "status.storage.replication.factor": "1"
    
  • Publish a message to the PubSub topic using the below command:

    gcloud pubsub topics publish test-topic --message='{"someKey":"someValue"}'
    
  • Read messages from the destination Kafka topics:

    /usr/bin/kafka-console-consumer --bootstrap-server xx.xxx.xxx.xx:9092 --topic kafka-topic --from-beginning
    
    # Output
    {"someKey":"someValue"}
    {"someKey":"someValue"}
    

Why is this happening, is there something that I am doing wrong?

4

1 回答 1

1

我在https://cloud.google.com/pubsub/docs/faq找到了以下信息,看来您也面临同样的问题。您能否尝试生成大消息并查看结果是否相同?

来自链接的详细信息:

为什么重复消息太多?Pub/Sub 保证至少一次消息传递,这意味着偶尔会出现重复。但是,高重复率可能表明客户端未在配置的 ack_deadline_seconds 内确认消息,并且 Pub/Sub 正在重试消息传递。这可以在用于拉取订阅的监控指标 pubsub.googleapis.com/subscription/pull_ack_message_operation_count 和用于推送订阅的 pubsub.googleapis.com/subscription/push_request_count 中观察到。在 /response_code 中查找提升的过期或 webhook_timeout 值。如果有许多小消息,这种情况尤其可能发生,因为 Pub/Sub 可能会在内部对消息进行批处理,并且部分确认的批处理将被完全重新传递。

另一种可能性是订阅者没有确认某些消息,因为处理这些特定消息的代码路径失败,并且从未发出 Acknowledge 调用;或者推送端点永远不会响应或响应错误。

如何检测重复消息?Pub/Sub 为每条消息分配一个唯一的 message_id,可用于检测订阅者收到的重复消息。但是,这将不允许您检测由对同一数据的多个发布请求产生的重复项。检测这些将需要发布者提供唯一的消息标识符。有关进一步讨论,请参阅 Pub/Sub I/O。

于 2021-03-02T14:02:35.187 回答