0
apache-storm-1.0.2
hbase-1.2.1
kafka_2.10-0.10.0.0
zookeeper-3.4.9

操作

zkServer.sh start
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &!
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 4 --topic project

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --list

    __consumer_offsets
    logons
    metrics
    notifications
    ticks
    project

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic project

Topic:project   PartitionCount:4        ReplicationFactor:1     Configs:
        Topic: project  Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: project  Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: project  Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: project  Partition: 3    Leader: 0       Replicas: 0     Isr: 0

zkServer.sh status

ZooKeeper JMX enabled by default
Using config: /home/dmitry/Development/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

java -version 
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

uname -a                         
Linux dmitry.npb.io 4.9.13-200.fc25.x86_64 

现在我在两个不同的 tmux 面板中运行以下 python 代码:

ffrom pykafka import KafkaClient, SslConfig
from pykafka.exceptions import ConsumerStoppedException, PartitionOwnedError

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'project']

try:
    messages = topic.get_balanced_consumer(
        consumer_group=b'project_bigtable',
        zookeeper_connect='localhost:2181')
except (ConsumerStoppedException, PartitionOwnedError):
    print('No connection')
    messages.stop()
    messages.start()

print('Connected')

for message in messages:
    if message is not None:
        print(message.offset, message.value)

第一次运行 -在不同的面板或窗口或终端中第二次运行没问题

我得到以下回溯:

蟒蛇消费者.py

Failed to acquire partition <pykafka.partition.Partition at 0x7fb8cb9b3cf8 (id=3)> after 4 retries.
Stopping consumer in response to error
Traceback (most recent call last):
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 633, in _add_partitions
    ephemeral=True
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/client.py", line 834, in create
    sequence=sequence, makepath=makepath).get()
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 78, in get
    raise self._exception
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 206, in captured_function
    return function(*args, **kwargs)
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 224, in captured_function
    value = function(*args, **kwargs)
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/client.py", line 889, in create_completion
    return self.unchroot(result.get())
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 72, in get
    raise self._exception
kazoo.exceptions.NodeExistsError: ((), {})

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 306, in start
    self._rebalance()
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 603, in _rebalance
    self._update_member_assignment()
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 577, in _update_member_assignment
    self._add_partitions(new_partitions - current_zk_parts)
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 636, in _add_partitions
    raise PartitionOwnedError(p)
pykafka.exceptions.PartitionOwnedError
Connected
Traceback (most recent call last):
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 731, in consume
    message = self._consumer.consume(block=block)
AttributeError: 'NoneType' object has no attribute 'consume'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "consumer.py", line 18, in <module>
    for message in messages:
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 745, in __iter__
    message = self.consume(block=True)
  File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 734, in consume
    raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException

我正在使用 Fedora 25 / Python 3.5.2

4

1 回答 1

0

我决定使用kafka-python,因为它在分区 kafka 主题中没有故障,没有我在 pykafka 时遇到的故障

这是我的工作代码:

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('trivver',
                         group_id='trivver_bigtable',
                         bootstrap_servers=['localhost:9092'])

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

好处是:

  • 我不需要将 zookeeper 的链接提供给消费者init,它可以适当地平衡工作人员
  • 当我有过多的工作人员(超过 kafka 主题中的可用分区) - 它只是在等待而不会因异常而崩溃(就像 pykafka 的情况一样)
  • 加入新员工没有问题,在 24 小时测试期间没有崩溃,kafka 负载很重,没有明显的内存泄漏
  • 它实现了开箱即用的 msgpack 反序列化器(这就是我需要的)

    KafkaConsumer(value_deserializer=msgpack.unpackb)

于 2017-03-24T21:21:09.810 回答