1

设想

我们有一个用例,我们在微服务中使用来自某个主题的事件,最终在某些业务流程之后,我们为不同的主题生成多个事件。

服务 -> 消费,然后是多个生产

我们在服务活跃运行几个小时后看到下面的日志

|REQTMOUT|rdkafka#producer-2| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2: Timed out ProduceRequest in flight (after 60728ms, timeout #0)
|REQTMOUT|rdkafka#producer-2| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2: 1 request(s) timed out: disconnect (after 706381222ms in state UP)

|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: {{placeholder}}.aws.confluent.cloud:9092: Disconnected (after 345599852ms in state UP, 1 identical error(s) suppressed)
error: Local: Broker transport failure: GroupCoordinator: {{placeholder}}.aws.confluent.cloud:9092: Disconnected (after 345599852ms in state UP, 1 identical error(s) suppressed)

|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/0]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/0: Disconnected (after 345600240ms in state UP, 1 identical error(s) suppressed)
error: Local: Broker transport failure: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/0: Disconnected (after 345600240ms in state UP, 1 identical error(s) suppressed)
|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/8]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/8: Disconnected (after 345600352ms in state UP, 1 identical error(s) suppressed)

其他详情

confluent-kafka-go 版本 - v1.7.0 ( https://github.com/confluentinc/confluent-kafka-go/ )

Kafka Cluster : Confluent Platform Basic plan (因为我们现在处于开发阶段)

消费者配置-

{
UserName: username,
Password: password,
Brokers: broker,
Topics: []string{kafkaConfig.CONSUMER_TOPIC},
ConsumerGroup: kafkaConfig.CONSUMER_GROUP,
FetchMinBytes: 1000,
PollTimeout: 100,
}

生产者配置-

{
UserName: username,
Password: password,
Brokers: broker,
Topic: kafkaConfig.PRODUCER_TOPIC,
BatchSize: 100000,
LingerMs: 10,
}

该服务在部署在 EKS (AWS) 上的 kubernetes pod 上运行

问题/疑问:作为一个团队,我们是 Kafka 和 golang 的新手。因此,希望能回答以下问题。

  • 我们看到这些日志的任何具体原因?

  • 我们在应用程序启动期间创建生产者,并在整个应用程序生命周期中保持单一生产者。生产者是否有超时/到期?我们的另一个选择是在生成消息时创建生产者,但在这种情况下,我们最终将创建许多(以数千个)生产者。处理这种情况的最佳方法应该是什么?

  • 您可能想推荐任何其他配置或更好的做法吗?

4

0 回答 0