3

我使用 pykafka 从 kafka 主题中获取消息,然后进行一些处理并更新到 mongodb。由于pymongodb每次只能更新一项,所以我启动了100个进程。但是在启动时,一些进程出现错误“PartitionOwnedError and ConsumerStoppedException”。我不知道为什么。谢谢你。

kafka_cfg = conf['kafka']
kafka_client = KafkaClient(kafka_cfg['broker_list'])                        
topic = kafka_client.topics[topic_name]                 

balanced_consumer = topic.get_balanced_consumer(
consumer_group=group,
auto_commit_enable=kafka_cfg['auto_commit_enable'],
zookeeper_connect=kafka_cfg['zookeeper_list'],
zookeeper_connection_timeout_ms = kafka_cfg['zookeeper_conn_timeout_ms'],
consumer_timeout_ms = kafka_cfg['consumer_timeout_ms'],
)
while(1):
    for msg in balanced_consumer:
        if msg is not None:
            try:
                value = eval(msg.value)
                id = long(value.pop("id"))
                value["when_update"] = datetime.datetime.now()
                query = {"_id": id}}

                result = collection.update_one(query, {"$set": value}, True)
            except Exception, e:
                log.error("Fail to update: %s, msg: %s", e, msg.value)

>

Traceback (most recent call last):
  File "dump_daily_summary.py", line 182, in <module>
    dump_daily_summary.run()
  File "dump_daily_summary.py", line 133, in run
    for msg in self.balanced_consumer:
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
    message = self.consume(block=True)
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 734, in consume
    raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException

>

Traceback (most recent call last):
  File "dump_daily_summary.py", line 182, in <module>
    dump_daily_summary.run()
  File "dump_daily_summary.py", line 133, in run
    for msg in self.balanced_consumer:
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
    message = self.consume(block=True)
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 726, in consume
    self._raise_worker_exceptions()
  File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions
    raise ex
pykafka.exceptions.PartitionOwnedError
4

3 回答 3

1

PartitionOwnedError : 检查同一个consumer_group中是否有一些后台进程正在消费,可能没有足够的可用分区来启动另一个消费者。

ConsumerStoppedException:您可以尝试升级您的 pykafka 版本(https://github.com/Parsely/pykafka/issues/574

于 2016-10-11T21:45:10.207 回答
0

我遇到了和你一样的问题。但是,我对其他人的解决方案感到困惑,比如为消费者添加足够的分区或更新 pykafka 的版本。事实上,我的满足上述条件。

这是工具的版本:

蟒蛇2.7.10

卡夫卡 2.11-0.10.0.0

动物园管理员 3.4.8

pykafka 2.5.0

这是我的代码:

class KafkaService(object):
    def __init__(self, topic):
        self.client_hosts = get_conf("kafka_conf", "client_host", "string")
        self.topic = topic
        self.con_group = topic
        self.zk_connect = get_conf("kafka_conf", "zk_connect", "string")

    def kafka_consumer(self):
        """kafka-consumer client, using pykafka

        :return: {"id": 1, "url": "www.baidu.com", "sitename": "baidu"}
        """
        from pykafka import KafkaClient
        consumer = ""
        try:
            kafka = KafkaClient(hosts=str(self.client_hosts))
            topic = kafka.topics[self.topic]

            consumer = topic.get_balanced_consumer(
                consumer_group=self.con_group,
                auto_commit_enable=True,
                zookeeper_connect=self.zk_connect,
            )
        except Exception as e:
            logger.error(str(e))

        while True:
            message = consumer.consume(block=False)
            if message:
                print "message:", message.value
                yield message.value

pykafka.balancedconsumer的函数consum (block=True)引发了两个异常(ConsumerStoppedExceptionPartitionOwnedError ) 。

当然,我建议您阅读该函数的源代码。

有一个参数block=True,修改为False后,程序不会陷入异常。

然后卡夫卡消费者工作正常。

于 2016-11-28T13:54:04.030 回答
0

此行为受到最近发现且目前正在修复的长期存在的错误的影响。我们在 Parse.ly 的生产环境中使用的解决方法是在一个环境中运行我们的消费者,当消费者因这些错误而崩溃时自动重启消费者,直到拥有所有分区。

于 2017-09-14T18:50:53.147 回答