我已经通过 Bitnami AMI 映像设置了一个运行 Apache Kafka 0.8 的 AWS EC2 实例。服务器属性几乎是默认的(Kafka 位于 localhost:9092,zookeeper 位于 localhost:2181)。
当我通过 SSH 连接到机器时,我可以使用位于 kafka/bin 的 Kafka 提供的脚本来生成/使用数据。为了生产,我运行以下命令:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
这工作正常,因此我确定 Kafka 工作正常。接下来,我尝试使用 python 库 pykafka 从我的机器上生产/消费:
client = KafkaClient(hosts = KAFKA_HOST)
topic = client.topics[sys.argv[1]]
try:
with topic.get_producer(max_queued_messages=1, auto_start=True) as producer:
while True:
for i in range(10):
message = "Test message sent on: " + str(datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y"))
encoded_message = message.encode("utf-8")
mess = producer.produce(encoded_message)
except Exception as error:
print('Something went wrong; printing exception:')
print(error)
我消费如下:
client = KafkaClient(hosts = KAFKA_HOST)
topic = client.topics[sys.argv[1]]
try:
while True:
consumer = topic.get_simple_consumer(auto_start=True)
for message in consumer:
if message is not None:
print (message.offset, message.value)
except Exception as error:
print('Something went wrong; printing exception:')
print(error)
这些片段运行时没有错误或异常,但不会产生或使用任何消息,甚至是通过本地脚本创建的消息。
我已经确认 9092 和 2181 端口都是通过 telnet 打开的。我的问题如下:
- 有没有办法调试此类问题并找到根本原因?如果存在一些连接问题,我希望库会抛出异常。
- 到底是怎么回事?