我正在尝试使用 AvroProducer 的confluent_kafka类以 Avro 格式生成消息。Kafka 和 Schema-Registry 在同一网络中作为 3 个节点的集群运行。
我正在从字符串中读取模式并初始化 AvroProducer,如下所示:
value_schema = loads("""{"doc": "Messages to be written.",
"namespace": "schemas.avro",
"type": "record",
"name": "kafkagwo",
"fields": [
{"name": "timestamp", "type": "string"},
{"name": "message", "type": "string"}
]}""")
key_schema = loads('{"type": "string"}')
p = AvroProducer({'bootstrap.servers': 'broker1,broker2,broker3',
'schema.registry.url': 'http://broker1:8081'})
schema-registry 的监听器在 schema 注册服务器端设置为 http://localhost:8081,即 broker1。
然后尝试使用下面的代码发送消息
value = {'timestamp': timestamp, 'message': message}
p.produce(topic = 'topic-1', partition=0,
key=str('key_0'),
value=value, callback=delivery_report,
value_schema = value_schema, key_schema = key_schema)
我得到的是
ConnectionError: HTTPConnectionPool(host='broker1', port=8081): Max retries exceeded with url: /subjects/topic-1-value/versions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x00000165C1522D88>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
我没有使用 Docker 容器。集群由 3 个独立的 VM 组成,其中安装并运行 Kafka 和 Registry Schema,因此它也不是独立的。Python 代码从具有网络访问权限和防火墙例外的第四个 VM 执行。事实上,我能够在没有 avro 和注册表模式的情况下生成和使用消息,所以我不认为这个问题与网络有关,但我对想法持开放态度。当我尝试将 Avro 与 Registry Schema 一起使用时,出现上述错误。此外,错误指出注册表模式端口 8081 所以问题应该与它有关,但我不知道还要寻找什么。