0

我正在尝试在 Kafka 中配置 SSL(在我的 Windows 本地安装)。我正在使用 confluent-kafka python 客户端。大多数解决方案都是针对 java 的,它涉及创建一些无法清楚理解的信任库、密钥库和 jass 配置。此外,我必须在属性文件(服务器/生产者/消费者)中进行哪些更改也不清楚。

这是生产者.py

# ./producer.py

import certifi
from confluent_kafka import Producer

if __name__ == '__main__':

    topic = "demo_ssl"
    conf = {
        'bootstrap.servers': "localhost:9093",
        'security.protocol': 'SASL_SSL',

        'ssl.ca.location': certifi.where(),

        'sasl.mechanism': 'PLAIN',
        'sasl.username': 'user',
        'sasl.password':'password',
    }

    # Create Producer instance
    producer = Producer(**conf)
    delivered_records = 0

    # on_delivery handler (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently failed delivery after retries.
    def acked(err, msg):
        global delivered_records
        """Delivery report handler called on
        successful or failed delivery of message"""
        if err is not None:
            print("Failed to deliver message: {}".format(err))
        else:
            delivered_records += 1
            print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))

    
    for n in range(10):
        record_key = "messageKey" + str(n)
        record_value = "messageValue" + str(n)
        print("Producing record: {}\t{}".format(record_key, record_value))
        producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
        # p.poll() serves delivery reports (on_delivery) from previous producer() calls.
        producer.poll(0)

    
    producer.flush()
    print("{} messages were producerd to topic {}!".format(delivered_records, topic))

这是消费者.py

# .\consumer.py

import certifi
from confluent_kafka import Consumer


if __name__ == '__main__':

    topic = 'demo_ssl'
    conf = {
        'bootstrap.servers': 'localhost:9093',
        'security.protocol': 'SASL_SSL',
        'group.id': 'group_ssl',
        'ssl.ca.location': certifi.where(),

        'sasl.mechanism': 'PLAIN',
        'sasl.username': 'user',
        'sasl.password': 'password',
    }

    # Create Consumer instance
    consumer = Consumer(conf)

    # Subscribe to topic
    consumer.subscribe([topic])

    # Process messages
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # No message available within timeout.
                # Initial message consumption may take up to 
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting for message or event/error in poll()")
                continue
            elif msg.error():
                print('error: {}'.format(msg.error()))
            else:
                # Check for Kafka message
                record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                record_value = msg.value().decode('utf-8')
                print("Consumed record with key " + record_key + " and value " + record_value)
    except KeyboardInterrupt:
        pass
    finally:
        print("Leave group and commit final offsets")
        consumer.close()

我没有更改 server.properties 或 consumer/producer.properties 中的任何服务器配置。当我运行我的消费者时,我收到了这个错误:

Waiting for message or event/error in poll()
Waiting for message or event/error in poll()
%3|1641896886.032|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv6#[::1]:9093 failed: Unknown error (after 2048ms in state CONNECT)

对于生产者.py:

Producing record: messageKey0   messageValue0
Producing record: messageKey1   messageValue1
Producing record: messageKey2   messageValue2
Producing record: messageKey3   messageValue3
Producing record: messageKey4   messageValue4
Producing record: messageKey5   messageValue5
Producing record: messageKey6   messageValue6
Producing record: messageKey7   messageValue7
Producing record: messageKey8   messageValue8
Producing record: messageKey9   messageValue9
%3|1641897033.087|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Unknown error (after 2056ms in state CONNECT)
%3|1641897036.073|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv6#[::1]:9093 failed: Unknown error (after 2041ms in state CONNECT)
%3|1641897039.120|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv6#[::1]:9093 failed: Unknown error (after 2062ms in state CONNECT, 1 identical error(s) suppressed)
%3|1641897042.133|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Unknown error (after 2055ms in state CONNECT)
4

0 回答 0