2

这是我的消费者配置。我正在使用 SSL 进行身份验证。

consumer = KafkaConsumer(config['kafka_topic'],
                         bootstrap_servers=config['kafka_brokers'],
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         group_id='recognition',
                         ssl_cafile='../kafka.ca.pem',
                         security_protocol="SSL", sasl_mechanism='PLAIN',
                         sasl_plain_password=config['kafka_password'],
                         sasl_plain_username=config['kafka_username'],
                         api_version=(0, 10, 1),
                         value_deserializer=lambda x: loads(x.decode('utf-8')))

错误:

ERROR:kafka.client:Unable to bootstrap from [('my_ip', 9094, <AddressFamily.AF_UNSPEC: 0>), ('my_ip_1', 9094, <AddressFamily.AF_UNSPEC: 0>), ('my_ip2', 9094, <AddressFamily.AF_UNSPEC: 0>)]
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name records-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-latency
DEBUG:kafka.metrics.metrics:Added sensor with name records-lag
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name heartbeat-latency
DEBUG:kafka.metrics.metrics:Added sensor with name join-latency
DEBUG:kafka.metrics.metrics:Added sensor with name sync-latency
DEBUG:kafka.metrics.metrics:Added sensor with name commit-latency
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('my_topic',)
WARNING:kafka.consumer.subscription_state:subscription unchanged by change_subscription(['my_topic'])
DEBUG:kafka.consumer.group:Subscribed to topic(s): ['my_topic']
<mysql.connector.connection.MySQLConnection object at 0x12f6427b8>
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.coordinator:Requesting metadata for group coordinator request: NoBrokersAvailable
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
4

0 回答 0