出于某种原因,有时,我们的 Confluent Kafka 消费者会收到关于特定主题的奇怪消息:
{
"magicByte": 2,
"attributes": 0,
"timestamp": "1623227829187",
"offset": "11575814",
"key": null,
"value": {
"type": "Buffer",
"data": []
},
"headers": {},
"isControlRecord": false,
"batchContext": {
"firstOffset": "11575814",
"firstTimestamp": "1623227829187",
"partitionLeaderEpoch": 5,
"inTransaction": false,
"isControlBatch": false,
"lastOffsetDelta": 0,
"producerId": "-1",
"producerEpoch": -1,
"firstSequence": -1,
"maxTimestamp": "1623227829187",
"timestampType": 0,
"magicByte": 2
}
}
即使我吞下了所有错误并在失败后提交了偏移量,我也多次使用了完全相同的消息。
它只发生在一个特定的主题上。我们从 Scarth 删除并重新创建了这个主题,但它没有解决这个问题。
在以下屏幕截图中,我们看到一条消息为空,而其他消息不为空。这是什么意思?
生产者 - C#:
_producerBuilder = new ProducerBuilder<Null, string>(new ProducerConfig
{
BootstrapServers = _kafka.BootstrapServers,
ClientId = Dns.GetHostName(),
LingerMs = _kafka.LingerMsKafka,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = _kafka.SaslUsername,
SaslPassword = _kafka.SaslPassword,
SecurityProtocol = SecurityProtocol.SaslSsl,
EnableSslCertificateVerification = false
}).Build();
消费者 NodeJ:
this.kafka = new Kafka.Kafka({
brokers: [`${connectionOptions.host}:${connectionOptions.port}`],
logLevel: Kafka.logLevel.NOTHING,
ssl: !!connectionOptions.sasl,
sasl: connectionOptions.sasl,
})
// ...
this.consumer = this.kafka.consumer({
groupId: this.groupId,
})
// ...
await this.consumer.connect()
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true })
await this.consumer.run({ batch }: EachBatchPayload) => {/* ...*/})
我们无法使用ccloud使用有问题的消息:错误 - “恐慌:运行时错误:索引超出范围 [0],长度为 0”
我们迷路了。任何帮助都会很棒!
NodeJS:v14.15.4 KafkaJS:1.16.0-beta.18