0

好吧,我正在尝试在 python 中使用 Kafka-python 包(1.3.2)来进行从生产者到消费者的简单数据传输。

制片人:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# produce asynchronously
for _ in range(2):
    producer.send('my-topic', b'message')
    producer.flush()
producer = KafkaProducer()

消费者:

from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic',
                     group_id='my-group',
                     bootstrap_servers=['localhost:9092'],fetch_min_bytes=1)
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))

consumer = KafkaConsumer()
consumer.subscribe(["my-topic"])

我收到以下关于我的消费者的信息:

my-topic:0:5056: key=None value=b'message' my-topic:0:5057: key=None value=b'message'

但与此同时,我在制片人那里遇到了这个错误:

Error in atexit._run_exitfuncs: Traceback (most recent call last): File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 364, in wrapper _self.close() File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 420, in close self._sender.join(timeout) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1060, in join self._wait_for_tstate_lock(timeout=max(timeout, 0)) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): OverflowError: timeout value is too large

默认情况下,超时设置为NONE,并且正在设置为999999999in Kafka.py。我无法弄清楚在 KafkaProducer 中传递此超时的参数 - 在我的生产者代码中。

有没有人遇到过这个问题?或者任何人都可以在这个方向上帮助我。提前致谢。

4

2 回答 2

0

我认为您的问题可能源于您使用的是 32 位窗口这一事实。据我所知,kafka-python 代码中没有明确支持 32 位。

据我所知,您的实现看起来不错。

于 2017-09-18T06:43:42.600 回答
0

根据此处的 py3 文档,如果超时参数太大(大于 TIMEOUT_MAX)将抛出溢出错误。

于 2020-02-07T00:44:43.587 回答