2

我使用 pykafka 编写了一个简单的生产者,但似乎无法让它执行。基本生产者和生产调用如下。当我用一条小消息调用它 100 次并添加一些计时/分析代码时,大约需要 14 秒。我知道这是异步发送消息,所以我希望它非常快。我缺少一些设置吗?我也用 min_queued_messages=1 尝试过,这需要大约 2 秒的时间。

from pykafka import KafkaClient
import time

client = KafkaClient(hosts="kafka1.mydomain.com:9092", exclude_internal_topics=False)
topic = client.topics['mytopic']

start = time.time()

for x in xrange(100):
    with topic.get_producer(delivery_reports=False,
                            sync=True,
                            linger_ms=0) as producer:
        producer.produce("This is a message")

end = time.time()
print "Execution Time (ms): %s" % round((end - start) * 1000)

我确实在 pycharm 中对此进行了分析,并且说 78.8% 的时间都花在了“time.sleep”上?!为什么会睡着?

4

1 回答 1

1

topic.get_producer调用应在生产者生命周期开始时调用一次。像您的示例代码那样在紧密循环中调用它会导致初始化序列重复运行,这是不必要的,并且会增加很多开销。如果将代码更改为以下内容,您的代码会运行得更快:

with topic.get_producer(delivery_reports=False,
                        sync=True,
                        linger_ms=0) as producer:
    for x in xrange(100):
        producer.produce("This is a message")
于 2018-04-09T23:23:15.817 回答