0
consumer = Consumer({'bootstrap.servers': bootstrap_server_host,
                               'group.id': group_id,
                               'enable.auto.commit': auto_commit})

consumer.subscribe([topic], on_assign=on_assign_callback, on_revoke=on_revoke_callback);

def on_assign_callback(consumer, partitions):
    get consumer offset use primary key (consumer_group_id, topic, partition_id)

def on_revoke_callback(consumer, partitions):
   store consumer offset use primary key (consumer_group_id, topic, partition_id)

但我无法在这些回调函数中获取 consumer_group_id

4

0 回答 0