0

我正在尝试使用 AvroProducer 的confluent_kafka类以 Avro 格式生成消息。Kafka 和 Schema-Registry 在同一网络中作为 3 个节点的集群运行。

我正在从字符串中读取模式并初始化 AvroProducer,如下所示:

value_schema = loads("""{"doc": "Messages to be written.",
        "namespace": "schemas.avro",
        "type": "record",
        "name": "kafkagwo",
        "fields": [
            {"name": "timestamp", "type": "string"},
            {"name": "message", "type": "string"}
        ]}""")
key_schema = loads('{"type": "string"}')
p = AvroProducer({'bootstrap.servers': 'broker1,broker2,broker3',
                 'schema.registry.url': 'http://broker1:8081'})

schema-registry 的监听器在 schema 注册服务器端设置为 http://localhost:8081,即 broker1。

然后尝试使用下面的代码发送消息

value = {'timestamp': timestamp, 'message': message}
p.produce(topic = 'topic-1', partition=0,
          key=str('key_0'),
          value=value, callback=delivery_report, 
          value_schema = value_schema, key_schema = key_schema)

我得到的是

ConnectionError: HTTPConnectionPool(host='broker1', port=8081): Max retries exceeded with url: /subjects/topic-1-value/versions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x00000165C1522D88>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))

我没有使用 Docker 容器。集群由 3 个独立的 VM 组成,其中安装并运行 Kafka 和 Registry Schema,因此它也不是独立的。Python 代码从具有网络访问权限和防火墙例外的第四个 VM 执行。事实上,我能够在没有 avro 和注册表模式的情况下生成和使用消息,所以我不认为这个问题与网络有关,但我对想法持开放态度。当我尝试将 Avro 与 Registry Schema 一起使用时,出现上述错误。此外,错误指出注册表模式端口 8081 所以问题应该与它有关,但我不知道还要寻找什么。

4

1 回答 1

0

问题是由于 schema-registry.properties 文件配置造成的。我已经设置了listeners=http://localhost:8081 ,但这对应于 127.0.0.1 并且不能从另一台机器访问。

我将其更改为 listeners=http://0.0.0.0:8081 并且有效。

于 2021-03-17T11:25:21.813 回答