2

I am using confluent Kafka-rest product to consume records from a topic. My intention is to consume only first 100 records from topic. I am using the following REST API to fetch records

GET /consumers/testgroup/instances/my_consumer/records

How to achieve this? Any idea?

4

3 回答 3

3

据我所知,目前这是不可能的。如另一个答案中所述,您可以以字节为单位指定最大大小(尽管在某些情况下实际上可以被代理忽略),但您无法指定所需的消息数量。

但是,这样的功能可以很容易地在您的客户端代码中实现。您可以猜测一个粗略的大小,查询 REST API 并查看您收到了多少条消息。如果小于 100,则再次查询以获取接下来的几条消息,直到达到 100。

于 2018-12-03T13:16:21.737 回答
1

可以使用属性ConsumerConfig.MAX_POLL_RECORDS_CONFIG来配置您的KafkaConsumer. 请看文档

于 2019-06-13T13:55:33.530 回答
0

如果您尝试使用来自消费者组的 100 条消息的新批次,则应将 max_bytes 设置为一个值,对于您的数据模型,该值将始终返回大约 100 条记录。你可以有一个更保守的逻辑(得到更少,然后得到更多,直到截止到 100),或者你可以总是得到更多然后忽略。在这两种方式中,您都应该对您的消费者组采用手动偏移管理。

GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000

如果您收到超过 100 条消息并且由于某种原因忽略了它们,如果启用了偏移量自动提交(它是在您创建消费者时定义的),您将不会在该消费者组上再次收到它们。你可能不希望这种情况发生!

如果您手动提交偏移量,那么如果您提交正确的偏移量以保证您不会丢失任何消息,那么您可以忽略任何您想要的内容。您可以像这样手动提交偏移量:

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": <calculated offset ending where you stopped consuming for this partition>
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": <calculated offset ending where you stopped consuming for this partition>
    }
  ]
}

如果您想准确获取该主题的前 100 条记录,那么您需要在再次消费之前重置该主题和每个分区的消费者组偏移量。你可以这样做(取自 confluent):

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 0
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 0
    }
  ]
}
于 2018-12-03T12:42:57.640 回答