Python 消费者客户端在作为独立运行时运行良好,但在作为具有相同配置的多处理工作者运行时无法检索消息。
客户端总是在为 msg 获得 None 的块中打印消息。非常感谢诊断此问题的任何帮助。
工人基本上看起来像:
from multiprocessing import Process
...
class saListener(Process):
def __init__(self, n)
self.ClientName = "saListener-" + str(n)
...
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
value_avro_deserializer = AvroDeserializer(ccloud_lib.value_schema, schema_registry_client)
conf["value.deserializer"] = value_avro_deserializer
self.cons = DeserializingConsumer(conf)
Process.__init__(self)
def connect(self):
self.cons.subscribe([self.topic])
def run(self):
while True:
msg = self.cons.poll(5.0)
if msg is None:
print(self.ClientName + ":Waiting for message or event/error in poll()")
控制器看起来像:
for n in range(instances):
lnr_instance = saListener(n)
lnr_instance.connect()
lnr_instance.start()
客户端配置:
"bootstrap.servers" : "srv1:909,srv2:909",
"group.id" : "ainvil9_intraday_group",
"debug" : "all",
"max.poll.interval.ms" : "30000",
"enable.auto.commit" : "true",
"fetch.wait.max.ms" : "1000",
"session.timeout.ms" : "10000",
"auto.commit.interval.ms" : "500",
"sasl.mechanism" : "GSSAPI",
"security.protocol" : "SASL_PLAINTEXT",
"sasl.kerberos.service.name" : "kafka",
"ssl.ca.location" : "security/ca.cert.pem",
"sasl.kerberos.kinit.cmd" : "kinit -R -p -kt security/kafka_ist_producer.keytab kafka_ist_producer@DMS",
"sasl.kerberos.keytab" : "security/kafka_ist_producer.keytab",
"sasl.kerberos.principal" : "kafka_ist_producer@DMS"
看起来消费者作为多处理工作者无法获得补偿:
%7|1596766163.070|HEARTBEAT|rdkafka#consumer-2| [thrd:main]: GroupCoordinator/9: Heartbeat for group "ainvil9_intraday_group" generation id 1
%7|1596766163.070|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.070|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.070|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
%7|1596766163.070|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
%7|1596766163.073|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.03ms)
%7|1596766163.073|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.08ms)
%7|1596766163.181|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.181|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.573|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.573|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.573|COMMIT|rdkafka#consumer-4| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.573|UNASSIGN|rdkafka#consumer-4| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.573|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.573|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.682|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored