如果您尝试使用来自消费者组的 100 条消息的新批次,则应将 max_bytes 设置为一个值,对于您的数据模型,该值将始终返回大约 100 条记录。你可以有一个更保守的逻辑(得到更少,然后得到更多,直到截止到 100),或者你可以总是得到更多然后忽略。在这两种方式中,您都应该对您的消费者组采用手动偏移管理。
GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000
如果您收到超过 100 条消息并且由于某种原因忽略了它们,如果启用了偏移量自动提交(它是在您创建消费者时定义的),您将不会在该消费者组上再次收到它们。你可能不希望这种情况发生!
如果您手动提交偏移量,那么如果您提交正确的偏移量以保证您不会丢失任何消息,那么您可以忽略任何您想要的内容。您可以像这样手动提交偏移量:
POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": <calculated offset ending where you stopped consuming for this partition>
},
{
"topic": "test",
"partition": 1,
"offset": <calculated offset ending where you stopped consuming for this partition>
}
]
}
如果您想准确获取该主题的前 100 条记录,那么您需要在再次消费之前重置该主题和每个分区的消费者组偏移量。你可以这样做(取自 confluent):
POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": 0
},
{
"topic": "test",
"partition": 1,
"offset": 0
}
]
}