0

所以我一直在尝试在非 kafka 集群上使用 pykafka 发送消息(它没有 kafka,只有必要的库)使用附加的代码片段,我将消息发送到 kafka 集群节点以供使用。但它返回超时异常。

我已经尝试了几乎所有可用的东西,即使是在 stackoverflow 上已经创建的问题。

问题:

是否需要在我的非 kafka 集群上也有 kafka 才能成功通信?(不要这么认为)

任何帮助,将不胜感激 ??

服务器配置:

# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://0.0.0.0:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://abc-Kka-00:9092```

Python code:

from pykafka import KafkaClient
import threading

KAFKA_HOST = "a.b.c.d:9092" # Or the address you want

client = KafkaClient(hosts = KAFKA_HOST)
topic = client.topics["kafkat"]

with topic.get_sync_producer() as producer:
    for i in range(10):
        message = "Test message " + str(i)
        encoded_message = message.encode("utf-8")
        producer.produce(encoded_message)


Error recived:
```pykafka.exceptions.ProduceFailureError: Delivery report not received after timeout```

Expected result:

Message to be passed and consumed on kafka node
4

1 回答 1

0

问题解决了。

问题是由于集群中的一台主机在配置中存在冲突,这就是消息无法通过的原因。更新配置后,它就像魅力一样。

于 2019-09-11T09:48:32.987 回答