我正在尝试在 Kafka 中配置 SSL(在我的 Windows 本地安装)。我正在使用 confluent-kafka python 客户端。大多数解决方案都是针对 java 的,它涉及创建一些无法清楚理解的信任库、密钥库和 jass 配置。此外,我必须在属性文件(服务器/生产者/消费者)中进行哪些更改也不清楚。
这是生产者.py
# ./producer.py
import certifi
from confluent_kafka import Producer
if __name__ == '__main__':
topic = "demo_ssl"
conf = {
'bootstrap.servers': "localhost:9093",
'security.protocol': 'SASL_SSL',
'ssl.ca.location': certifi.where(),
'sasl.mechanism': 'PLAIN',
'sasl.username': 'user',
'sasl.password':'password',
}
# Create Producer instance
producer = Producer(**conf)
delivered_records = 0
# on_delivery handler (triggered by poll() or flush())
# when a message has been successfully delivered or permanently failed delivery after retries.
def acked(err, msg):
global delivered_records
"""Delivery report handler called on
successful or failed delivery of message"""
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))
for n in range(10):
record_key = "messageKey" + str(n)
record_value = "messageValue" + str(n)
print("Producing record: {}\t{}".format(record_key, record_value))
producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
# p.poll() serves delivery reports (on_delivery) from previous producer() calls.
producer.poll(0)
producer.flush()
print("{} messages were producerd to topic {}!".format(delivered_records, topic))
这是消费者.py
# .\consumer.py
import certifi
from confluent_kafka import Consumer
if __name__ == '__main__':
topic = 'demo_ssl'
conf = {
'bootstrap.servers': 'localhost:9093',
'security.protocol': 'SASL_SSL',
'group.id': 'group_ssl',
'ssl.ca.location': certifi.where(),
'sasl.mechanism': 'PLAIN',
'sasl.username': 'user',
'sasl.password': 'password',
}
# Create Consumer instance
consumer = Consumer(conf)
# Subscribe to topic
consumer.subscribe([topic])
# Process messages
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# No message available within timeout.
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting for message or event/error in poll()")
continue
elif msg.error():
print('error: {}'.format(msg.error()))
else:
# Check for Kafka message
record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
record_value = msg.value().decode('utf-8')
print("Consumed record with key " + record_key + " and value " + record_value)
except KeyboardInterrupt:
pass
finally:
print("Leave group and commit final offsets")
consumer.close()
我没有更改 server.properties 或 consumer/producer.properties 中的任何服务器配置。当我运行我的消费者时,我收到了这个错误:
Waiting for message or event/error in poll()
Waiting for message or event/error in poll()
%3|1641896886.032|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv6#[::1]:9093 failed: Unknown error (after 2048ms in state CONNECT)
对于生产者.py:
Producing record: messageKey0 messageValue0
Producing record: messageKey1 messageValue1
Producing record: messageKey2 messageValue2
Producing record: messageKey3 messageValue3
Producing record: messageKey4 messageValue4
Producing record: messageKey5 messageValue5
Producing record: messageKey6 messageValue6
Producing record: messageKey7 messageValue7
Producing record: messageKey8 messageValue8
Producing record: messageKey9 messageValue9
%3|1641897033.087|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Unknown error (after 2056ms in state CONNECT)
%3|1641897036.073|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv6#[::1]:9093 failed: Unknown error (after 2041ms in state CONNECT)
%3|1641897039.120|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv6#[::1]:9093 failed: Unknown error (after 2062ms in state CONNECT, 1 identical error(s) suppressed)
%3|1641897042.133|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Unknown error (after 2055ms in state CONNECT)