我有 5 个消费者用于 avro 主题。对于每个消费者,它group.id
是不同的,它是随机生成的。所以没有共享group.id
。我注意到,无论我重启多少次,消费者都无法连接到代理,有时甚至更多。他们只是挂起。
所以我开始对分配给group.id
有问题的消费者的名称进行一些更改,我发现相同的值使消费者工作,而其他一些则没有。
首先,这就是我正在使用的
avro-python3 1.10.1
confluent-kafka 1.5.0
python 3.7
Apache Broker version 0.9.0.1
Confluent Brokers community editions with 5 brokers and schema registry
这是消费者
class AvroConsumer:
def __init__(self, **kwargs):
self.topic = 'my-avro-topic'
consumer_conf = {'session.timeout.ms': 6000,
'enable.auto.commit': 'true',
'log.connection.close': 'false',
'default.topic.config': {'auto.offset.reset': 'latest'},
'schema.registry.url': 'http://xxxxxx:8081',
'bootstrap.servers':
'aaaaaa:9092,bbbbbbbbb:9092,cccccc:9092,dddddd:9092,eeeeee:9092',
'security.protocol': 'plaintext',
'group.id': 'services_registry_consumer_test_01',
'debug': 'consumer',
'api.version.request': 'false'
'broker.version.fallback' = '0.9.0.1'}
self.consumer = AvroConsumer(consumer_conf)
def run(self):
sys.stderr.write(
'Consumer started for topic(s) %s \n' % self.consumer_topic)
try:
self.consumer.subscribe(self.consumer_topic, on_assign=self.print_assignment)
while True:
incoming_message = self.consumer.poll(1)
if incoming_message is None:
continue
elif incoming_message.error() is None:
message_payload = incoming_message.value()
sys.stderr.write(
'[Topic:%s][Message:%s] \n' % (
str.upper(incoming_message.topic()), message_payload))
elif incoming_message.error():
if incoming_message.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(incoming_message.topic(), incoming_message.partition(),
incoming_message.offset()))
elif incoming_message.error():
sys.stderr.write(
' Consumer Error {incoming_message.error()}\n')
raise KafkaException(incoming_message.error())
except Exception as whatever_it_is:
sys.stderr.write(
' EXCEPTION %s on the consumer for topic(s) %s\n' % (
whatever_it_is, self.consumer_topic))
exit(1)
finally:
sys.stderr.write(
'CLOSING consumer %s with topic(s): %s\n' % (
self.consumer, self.consumer_topic))
self.consumer.close()
return 1
def print_assignment(self, consumer, partitions):
sys.stderr.write(
'Topic:%s Assignment:%s\n' % (
self.consumer_topic, partitions))
例如,如果我将值分配给services_registry_consumer_test_01
键group.id
,一切都按预期工作,我得到了
%7|1610549830.729|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS STATIC_LIB_zlib ZLIB SSL SASL_CYRUS STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
[2021-01-13T14:57:11+00:00] Communication thread started for topic(s) ['my-avro-topic']
%7|1610549831.591|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01": subscribe to new subscription of 1 topics (join state init)
%7|1610549831.591|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01" is rebalancing in state init (join-state init) without assignment: unsubscribe
%7|1610549832.135|JOIN|rdkafka#consumer-1| [thrd:main]: xxxxxxx:9092/4: Joining group "services_registry_consumer_test_01" with 1 subscribed topic(s)
%5|1610549832.135|MAXPOLL|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/4: Broker does not support KIP-62 (requires Apache Kafka >= v0.10.1.0): consumer configuration `max.poll.interval.ms` (300000) is effectively limited by `session.timeout.ms` (6000) with this broker version
%7|1610549835.397|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01": "range" assignor run for 1 member(s)
[2021-01-13T14:57:15+00:00]Topic:['my-avro-topic'] Assignment:[TopicPartition{topic=my-avro-topic,partition=0,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=1,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=2,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=3,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=4,offset=-1001,error=None}]
%7|1610549835.488|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01": new assignment of 5 partition(s) in join state wait-assign-rebalance_cb
%7|1610549835.488|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/4: Fetch committed offsets for 5/5 partition(s)
%7|1610549835.890|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [3] start fetching at offset 0
%7|1610549835.974|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [4] start fetching at offset 90
%7|1610549835.974|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [0] start fetching at offset 0
%7|1610549835.991|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [1] start fetching at offset 0
%7|1610549835.994|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [2] start fetching at offset 0
如果我将值分配给services_registry_consumer_test_02
key group.id
,消费者,以前工作的同一个,现在挂起而没有设法订阅给定的主题。
%7|1610549718.744|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS STATIC_LIB_zlib ZLIB SSL SASL_CYRUS STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
[2021-01-13T14:55:19+00:00] Communication thread started for topic(s) ['my-avro-topic']
%7|1610549719.482|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_02": subscribe to new subscription of 1 topics (join state init)
%7|1610549719.482|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_02" is rebalancing in state init (join-state init) without assignment: unsubscribe
如果我重新分配值services_registry_consumer_test_01
,一切都会按预期开始工作。我知道这听起来可能很奇怪,但事实就是如此,而且我已经睡了好几天了。
我不知道我是否在这里做错了什么,我非常感谢您对此的看法。