0

我正在尝试将 kafka 与 python 与 pykafka 一起使用,当我尝试使用 linger_ms 时出现此错误:

类型错误:生产()得到了一个意外的关键字参数“linger_ms”

这是我的代码:

import queue
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093",broker_version="1.0.0")
topic = client.topics['mytopic']

with topic.get_producer(delivery_reports=True) as producer:
     count = 0
     while True:
         count += 1
         producer.produce(
             'test msg'.encode(encoding='UTF-8'), 
             partition_key=('{}'.format(count))
                          .encode(encoding='UTF-8'),
             timestamp=(datetime.datetime.now())+timedelta(seconds=120),
             linger_ms=120000)
4

1 回答 1

0

a上的produce()方法Producer不带linger_ms参数。这就是您收到此错误的原因。

linger_ms初始化时传递参数Producer

with topic.get_producer(delivery_reports=True, linger_ms=120000) as producer:
    ...
于 2019-08-26T14:35:24.680 回答