1

当我运行我的消费者组时,我总是有这样的统计数据:

2016-10-15 13:56:17.925: "STATS": { "name": "debian-meox#producer-2", "type": "producer", "ts":16768436761, "time":1476532577, "replyq":0, "msg_cnt":75428, "msg_size":29314007, "msg_max":100000, "msg_size_max":4096000000, "simple_cnt":0, "brokers":{ "localhost:9092/bootstrap": { "name":"localhost:9092/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989561, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10064, "max":10064, "avg":10064, "sum":10064, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "localhost:9093/bootstrap": { "name":"localhost:9093/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989603, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10078, "max":10078, "avg":10078, "sum":10078, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "localhost:9094/bootstrap": { "name":"localhost:9094/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989343, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10075, "max":10075, "avg":10075, "sum":10075, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "debian-meox:9094/2": { "name":"debian-meox:9094/2", "nodeid":2, "state":"UP", "stateage":4959041, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":472, "max":472, "avg":472, "sum":472, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "debian-meox:9093/1": { "name":"debian-meox:9093/1", "nodeid":1, "state":"UP", "stateage":4958754, "outbuf_cnt":28, "outbuf_msg_cnt":63922, "waitresp_cnt":3, "waitresp_msg_cnt":7488, "tx":1121, "txbytes":587723174, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":633, "rxbytes":24173, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":1248, "max":205146, "avg":22323, "sum":14130815, "cnt":633 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":632 }, "toppars":{ "test_topic": { "topic":"test_topic", "partition":1} } } , "debian-meox:9092/0": { "name":"debian-meox:9092/0", "nodeid":0, "state":"UP", "stateage":4958760, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":328, "max":328, "avg":328, "sum":328, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ "test_topic": { "topic":"test_topic", "partition":0} } } }, "topics":{ "test_topic": { "topic":"test_topic", "metadata_age":4933, "partitions":{ "0": { "partition":0, "leader":0, "desired":false, "unknown":false, "msgq_cnt":0, "msgq_bytes":0, "xmit_msgq_cnt":0, "xmit_msgq_bytes":0, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":0, "txbytes":0, "msgs": 0, "rx_ver_drops": 0 } , "1": { "partition":1, "leader":1, "desired":false, "unknown":false, "msgq_cnt":1572, "msgq_bytes":840177, "xmit_msgq_cnt":2474, "xmit_msgq_bytes":1322717, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":1357083, "txbytes":613073020, "msgs": 1361129, "rx_ver_drops": 0 } , "-1": { "partition":-1, "leader":-1, "desired":false, "unknown":false, "msgq_cnt":0, "msgq_bytes":0, "xmit_msgq_cnt":0, "xmit_msgq_bytes":0, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":0, "txbytes":0, "msgs": 17604, "rx_ver_drops": 0 } } } } }

如您所见,所有偏移量都设置为 -1001:

"committed_offset":-1001,
"eof_offset":-1001,
"lo_offset":-1001,
"hi_offset":-1001,

这是我的设置(也适用于 default_topic):

"auto.commit.enable", "true"
"offset.store.method", "broker"

有时,即使主题中有很多消息,消费者组也无法获取它们。任何想法?

4

1 回答 1

0

所有分区都在"fetch_state": "none"其中,通常意味着它们不是assign():ed,因此不符合消费条件(也可能是它们没有领导者,但这里不是这种情况:)"leader": 1enable.auto.commit只有在从代理获取消息并由您的应用程序使用时,偏移量才会更新和提交(如果为 true(默认))。

你注册了 RebalanceCb 吗?如果是这样,您必须调用assign(partitions...)onERR__ASSIGN_PARTITIONSunassign()on ERR__REVOKE_PARTITIONS

例子:

class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer,
             RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
      consumer->assign(partitions);
    else
      consumer->unassign();
  }
};

完整示例:https ://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.cpp

于 2016-10-17T05:35:36.983 回答