1

我正在使用 pykafka,并且有一个异步生产并使用 delivery_reports 的生产者。我知道必须使用“get_delivery_report”方法阅读交付报告,并且我知道必须在与生成的消息相同的线程中调用它。但是,get_delievery_report 是否必须在每次调用后调用才能生成,还是可以调用一次?如果发生不止一个,get_delivery_report 将返回所有失败的发送。例如,假设我异步发送 100 条消息:

for x in xrange(100):
   with topic.get_producer(delivery_reports=Try, sync=False) as producer:
      producer.produce("Test Message")

msg, exc = producer.get_delivery_report()

还是必须是:

for x in xrange(100):
   with topic.get_producer(delivery_reports=Try, sync=False) as producer:
      producer.produce("Test Message")
      msg, exc = producer.get_delivery_report()

第一个似乎比第二个运行得快得多。

4

1 回答 1

0

除了此代码过度使用的问题topic.get_producer(在我的答案中解决之外,第一个示例运行速度比第二个快得多的原因是第二个示例有效地以同步模式运行。也就是说,生成的每条消息都会导致等待传递确认,然后才能生成下一条消息。如果您对编写一个异步生成的应用程序感兴趣,那么您可能对与第一个示例更接近的东西更感兴趣。pykafka 自述文件中列出了执行此操作的正确方法:

with topic.get_producer(delivery_reports=True) as producer:
    count = 0
    while True:
        count += 1
        producer.produce('test msg', partition_key='{}'.format(count))
        if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
            while True:
                try:
                    msg, exc = producer.get_delivery_report(block=False)
                    if exc is not None:
                        print 'Failed to deliver msg {}: {}'.format(
                            msg.partition_key, repr(exc))
                    else:
                        print 'Successfully delivered msg {}'.format(
                        msg.partition_key)
                except Queue.Empty:
                    break

此代码10 ** 5异步生成消息,然后停止生成以使用传递报告队列,该队列包含每条生成的消息的报告。它打印任何报告的交付错误,并允许在整个队列被消耗后恢复生产。该10 ** 5数字可以根据您的内存限制进行调整 - 它有效地限制了传递报告队列的大小。

于 2018-04-10T16:43:11.553 回答