这是我的消费者配置。我正在使用 SSL 进行身份验证。
consumer = KafkaConsumer(config['kafka_topic'],
bootstrap_servers=config['kafka_brokers'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='recognition',
ssl_cafile='../kafka.ca.pem',
security_protocol="SSL", sasl_mechanism='PLAIN',
sasl_plain_password=config['kafka_password'],
sasl_plain_username=config['kafka_username'],
api_version=(0, 10, 1),
value_deserializer=lambda x: loads(x.decode('utf-8')))
错误:
ERROR:kafka.client:Unable to bootstrap from [('my_ip', 9094, <AddressFamily.AF_UNSPEC: 0>), ('my_ip_1', 9094, <AddressFamily.AF_UNSPEC: 0>), ('my_ip2', 9094, <AddressFamily.AF_UNSPEC: 0>)]
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name records-fetched
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-latency
DEBUG:kafka.metrics.metrics:Added sensor with name records-lag
DEBUG:kafka.metrics.metrics:Added sensor with name fetch-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name heartbeat-latency
DEBUG:kafka.metrics.metrics:Added sensor with name join-latency
DEBUG:kafka.metrics.metrics:Added sensor with name sync-latency
DEBUG:kafka.metrics.metrics:Added sensor with name commit-latency
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('my_topic',)
WARNING:kafka.consumer.subscription_state:subscription unchanged by change_subscription(['my_topic'])
DEBUG:kafka.consumer.group:Subscribed to topic(s): ['my_topic']
<mysql.connector.connection.MySQLConnection object at 0x12f6427b8>
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.coordinator:Requesting metadata for group coordinator request: NoBrokersAvailable
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available