我正在尝试使用https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py的repo 中示例Confluent Cloud
的修改版本连接到我的 Kafka 集群。我已经用我认为正确的参数配置了我的和,但出现以下运行时错误:AIOKafka
ssl_consume_produce.py
AIOKafka
AIOKafkaConsumer
AIOKafkaProducer
/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
Traceback (most recent call last):
File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 57, in <module>
loop.run_until_complete(task)
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 52, in <module>
loop.run_until_complete(task)
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 23, in produce_and_consume
await producer.start()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
await self.client.bootstrap()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
version_hint=version_hint)
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
await conn.connect()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
await self._do_sasl_handshake()
File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
payload, expect_response = res
RuntimeError: await wasn't used with future
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f9bc818d350>
我改编的代码版本是:
import asyncio
from ssl import create_default_context, Purpose
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from kafka.common import TopicPartition
import ccloud_lib
ssl_context = create_default_context(Purpose.SERVER_AUTH, cafile='cacert.pem')
conf = ccloud_lib.read_ccloud_config('kafka_config.conf')
async def produce_and_consume(loop):
# Produce
producer = AIOKafkaProducer(
bootstrap_servers=conf['bootstrap.servers'],
loop = loop,
security_protocol='SASL_SSL',
ssl_context=ssl_context,
sasl_mechanism='PLAIN',
sasl_plain_password=conf['sasl.password'],
sasl_plain_username=conf['sasl.username']
)
await producer.start()
try:
msg = await producer.send_and_wait(
'my_topic', b"Super Message", partition=0)
finally:
await producer.stop()
consumer = AIOKafkaConsumer(
bootstrap_servers=conf['bootstrap.servers'],
loop=loop,
ssl_context=ssl_context,
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_password=conf['sasl.password'],
sasl_plain_username=conf['sasl.username']
)
await consumer.start()
try:
consumer.seek(TopicPartition('my_topic', 0), msg.offset)
fetch_msg = await consumer.getone()
finally:
await consumer.stop()
print("Success", msg, fetch_msg)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task = loop.create_task(produce_and_consume(loop))
try:
loop.run_until_complete(task)
finally:
loop.run_until_complete(asyncio.sleep(0, loop=loop))
task.cancel()
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
我的混淆配置conf
如下所示:
bootstrap.servers=*****.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="********************" password\="****************************************";
sasl.username=********************
sasl.password=********************************************************************************
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=********************:********************
schema.registry.url=https://********************.us-central1.gcp.confluent.cloud
是否可以Confluent Cloud
使用 AIOKafka 客户端连接?我的配置有什么不正确的吗?