我是 Kafka 的新手,正在尝试在其上构建服务消息传递平台的服务。这是我的设置:
卡夫卡 0.9.0.1
动物园管理员 3.4.8卡夫卡
-蟒蛇1.3.3
我的应用程序创建了一个KafkaProducer
,我从中将消息流发送到具有 6 个分区的单个主题。我还创建了 7 个KafkaConsumer
(在一个 s 下group_id
,其中 6 个分配给 6 个分区,一个处于空闲状态(这是预期的)。当生产者正在流式传输时,我将分区计数增加到 7,并期望流不会分布在 7 个分区上并且会唤醒空闲的消费者。但是,在我通过重新启动应用程序重新初始化它之前,生产者似乎不会拿起新添加的分区。我缩放分区计数通过运行:
kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7
生产者有没有办法在不重新初始化的情况下获取分区计数的变化?
以下是相关的代码片段:
制片人
class Producer(threading.Thread):
daemon = True
def __init__(self, name, manager):
super(Producer, self).__init__()
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
def run(self):
while not self.killed:
if not self.q.empty():
self._busy()
self.producer.send('test', value=self.q.get())
else:
self._free()
消费者
class Consumer(threading.Thread):
daemon = True
def __init__(self, name, manager):
super(Consumer, self).__init__()
self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
group_id='test_group',
client_id="Consumer " + self.name)
self.consumer.subscribe(['test'])
def run(self):
while not self.killed:
messages = self.consumer.poll()
for topic, records in messages.iteritems():
print self.consumer.config['client_id'] + ": " + str(records)