1

现在,我想在node-rdkafka我们的服务中实现,但我多次遇到这个错误Broker: Unknown member。github 上的相同问题是https://github.com/confluentinc/confluent-kafka-dotnet/issues/1464。他们说我们的消费者使用相同的组 id 重试或延迟。但我没有发现我的代码有任何重试和延迟。或https://github.com/confluentinc/confluent-kafka-python/issues/1004,但我已经重新检查了所有消费者组 ID,它是独一无二的。

生产者的配置node-rdkafka如下:

      this.producer = new Producer({
        "client.id": this.cliendID,
        "metadata.broker.list": this.brokerList,
        'compression.codec': "lz4",
        'retry.backoff.ms': 200,
        'socket.keepalive.enable': true,
        'queue.buffering.max.messages': 100000,
        'queue.buffering.max.ms': 1000,
        'batch.num.messages': 1000000,
        "transaction.timeout.ms": 2000,
        "enable.idempotence": false,
        "max.in.flight.requests.per.connection": 1,
        "debug": this.debug,
        'dr_cb': true,
        "retries": 0,
        "log_cb": (_: any) => console.log(`log_cb =>`, _),
        "sasl.username": this.saslUsername,
        "sasl.password": this.saslPassword,
        "sasl.mechanism": this.saslMechanism,
        "security.protocol": this.securityProtocol
      }, {
        "acks": -1
      })

消费者配置node-rdkafka如下:

this.consumer = new KafkaConsumer({
        'group.id': this.groupID,
        'metadata.broker.list': this.brokerList,
        "sasl.username": this.saslUsername,
        "sasl.password": this.saslPassword,
        "enable.auto.commit": false,
        "auto.commit.interval.ms": 2000,
        "session.timeout.ms": 45000,
        "max.poll.interval.ms": 300000,
        "heartbeat.interval.ms": 3000,
        "api.version.request.timeout.ms": 10000,
        "max.in.flight.requests.per.connection": 1,
        "debug": this.debug,
        "sasl.mechanism": this.saslMechanism,
        "log.connection.close": true,
        "log.queue": true,
        "log_level": 7,
        "log.thread.name": true,
        "isolation.level": "read_committed",
        "ssl.ca.location": "/etc/ssl/certs/",
        "log_cb": (_: any) => console.log(`log_cb =>`, _),
        "security.protocol": this.securityProtocol
      }, {})

      await new Promise(resolve => {

        this.consumer?.connect()
        this.consumer?.on('ready', () => {
          try {
            this.consumer?.subscribe(subscriptions)
            this.consumer?.consume()
            console.log('[SUCCESS] Subscribe Event => all event')
          } catch (err) {
            console.log('[FAILED] Subscribe => all event')
            console.log(err)
          }

          resolve(this.consumer)
        }).on('data', async (data) => {

          this.topicFunctionMap[data.topic]({
            partition: data.partition,
            topic: data.topic,
            message: {
              key: data.key,
              offset: data.offset.toString(),
              size: data.size,
              value: data.value,
              timestamp: data.timestamp?.toString()
            }
          } as ISubsCallbackParam)

          this.consumer?.commitSync({
            topic: data.topic,
            offset: data.offset,
            partition: data.partition
          })


        })
      })

使用这些配置,消费者能够接收事件,但它不会持续很长时间。大约 2 小时后,它会随机给出这些错误。我不确定是因为手动提交还是我们的功能需要很长时间,因为我已经尝试了两者asyncsync提交,所以 commitSync 它不依赖于我们的功能。

之所以这么说,是因为我们的函数需要很长时间,并且它使我们的消费者被踢出小组。在我发现额外的错误 Broker: Specified group generation id is not valid 之后,它可能是嫌疑人

来源:https ://github.com/confluentinc/confluent-kafka-dotnet/issues/1155

它说我需要增加会话超时,然后我尝试将其增加到"session.timeout.ms": 3000005 分钟,并且"heartbeat.interval.ms":3000我在 github 问题中发现心跳应该小于 = (timeout/3)。所以我认为 3sec 就可以了。

使用"session.timeout.ms": 300000"heartbeat.interval.ms":3000 消费者能够消费并持续很长时间,但问题是:

  • 第一次使用这些配置,它可以运行大约 0-2 秒来接收
  • 过了一会儿,它收到了,但需要 1-10 秒才能收到消息

详细错误:

received event => onCustomerServiceRegister
[COMMIT_ERR]  LibrdKafkaError: Broker: Unknown member
    at Function.createLibrdkafkaError [as create] (/src/app/node_modules/node-rdkafka/lib/error.js:454:10)
    at KafkaConsumer.Client._errorWrap (/src/app/node_modules/node-rdkafka/lib/client.js:481:29)
    at KafkaConsumer.commitSync (/src/app/node_modules/node-rdkafka/lib/kafka-consumer.js:560:8)
    at KafkaRDConnect.<anonymous> (/src/app/dist/events/connectors/kafkaRD.js:240:110)
    at step (/src/app/dist/events/connectors/kafkaRD.js:53:23)
    at Object.next (/src/app/dist/events/connectors/kafkaRD.js:34:53)
    at /src/app/dist/events/connectors/kafkaRD.js:28:71
    at new Promise (<anonymous>)
    at __awaiter (/src/app/dist/events/connectors/kafkaRD.js:24:12)
    at KafkaConsumer.<anonymous> (/src/app/dist/events/connectors/kafkaRD.js:213:72)
    at KafkaConsumer.emit (node:events:376:20)
    at KafkaConsumer.EventEmitter.emit (node:domain:470:12)
    at /src/app/node_modules/node-rdkafka/lib/kafka-consumer.js:488:12 {

4

0 回答 0