所以我一直在尝试在非 kafka 集群上使用 pykafka 发送消息(它没有 kafka,只有必要的库)使用附加的代码片段,我将消息发送到 kafka 集群节点以供使用。但它返回超时异常。
我已经尝试了几乎所有可用的东西,即使是在 stackoverflow 上已经创建的问题。
问题:
是否需要在我的非 kafka 集群上也有 kafka 才能成功通信?(不要这么认为)
任何帮助,将不胜感激 ??
服务器配置:
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://0.0.0.0:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://abc-Kka-00:9092```
Python code:
from pykafka import KafkaClient
import threading
KAFKA_HOST = "a.b.c.d:9092" # Or the address you want
client = KafkaClient(hosts = KAFKA_HOST)
topic = client.topics["kafkat"]
with topic.get_sync_producer() as producer:
for i in range(10):
message = "Test message " + str(i)
encoded_message = message.encode("utf-8")
producer.produce(encoded_message)
Error recived:
```pykafka.exceptions.ProduceFailureError: Delivery report not received after timeout```
Expected result:
Message to be passed and consumed on kafka node