以下应用程序是一个简单的消费者,它将所有消息打印到控制台。
#!/usr/bin/env python
import confluent_kafka
consumer = confluent_kafka.Consumer({
'bootstrap.servers': 'kafka05-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'XXX',
'sasl.password': 'XXX',
'api.version.request': True,
'client.id': 'consumer01',
'group.id': 'group01',
})
consumer.subscribe(['logs'])
while True:
msg = consumer.poll(1)
if msg is not None and msg.error() is None:
print(msg.value().decode('utf-8'))
一开始它工作得很好。几个小时后,我看到以下错误消息。一旦我重新启动脚本,它就会再次正常工作。
^C%3|1504028772.615|失败|consumer01#consumer-1| [thrd:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.]:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.net:9093/7:初始化失败SASL 身份验证:代理不支持 SASL 握手(机制 PLAIN 要求)%3|1504028772.615|ERROR|consumer01#consumer-1| [thrd:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.]:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.net:9093/7:初始化失败SASL 身份验证:代理不支持 SASL 握手(机制 PLAIN 要求)