我有一些简单的代码来测试aiokafka
库的性能。我正在使用一台 Windows 计算机,运行 Docker for Windows,以及一个 8 核的虚拟机。
在这种情况下,库aiokafka
似乎实现了非常低的生产者吞吐量,大约每秒 1 MiB:
async def send_many():
f = bigass_object_factory.BigAssObjectFactory()
v = f.create_bytearray()
print(f'{len(v)} length {type(v)} named v')
producer = AIOKafkaProducer(
bootstrap_servers='kafka:9092',
loop=asyncio.get_event_loop(),
acks=0,
)
pr = cProfile.Profile()
pr.enable()
await producer.start()
total_sent = 0
started = time.time()
for i in range(100):
await producer.send('test_topic_aiokafka', value=v)
print(f'sent {i}')
await producer.stop()
pr.disable()
s = io.StringIO()
sortby = 'cumtime'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
def send_many_main():
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.get_event_loop().run_until_complete(send_many())
p1 = Process(target=send_many_main)
p1.start()
p1.join()
这需要 60 多秒才能在我的计算机上运行,IE 需要 60 多秒才能发送 100 条 1 MiB 消息。我知道这对卡夫卡来说是一个重要的信息,但这很荒谬,我很确定这不应该是表演。当我使用我的个人资料时,cProfile
我看到了这个
ncalls tottime percall cumtime percall filename:lineno(function)
169 0.003 0.000 70.619 0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:260(_send_produce_req)
169 0.008 0.000 70.615 0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:732(do)
100 0.015 0.000 70.132 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:701(create_request)
100 0.003 0.000 70.103 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:236(get_data_buffer)
100 0.001 0.000 70.101 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:83(_build)
100 0.000 0.000 70.099 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:526(build)
100 0.119 0.001 70.099 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:482(write_header)
100 0.000 0.000 69.968 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/util.py:115(calc_crc32c_py)
100 0.001 0.000 69.968 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:130(crc)
100 69.967 0.700 69.967 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:100(crc_update)
169 0.004 0.000 0.473 0.003 /usr/local/lib/python3.9/site-packages/aiokafka/client.py:460(send)
104 0.104 0.001 0.466 0.004 /usr/local/lib/python3.9/site-packages/aiokafka/conn.py:374(send)
208 0.001 0.000 0.294 0.001 /usr/local/lib/python3.9/site-packages/kafka/util.py:155(__call__)
208 0.004 0.000 0.293 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/struct.py:40(_encode_self)
408/208 0.025 0.000 0.289 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:143(encode)
107 0.002 0.000 0.270 0.003 /opt/project/tests/contrib/aiokafka/test_aiokafka_performance.py:34(send_many)
408/208 0.003 0.000 0.237 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:146(<listcomp>)
202/102 0.002 0.000 0.233 0.002 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:181(encode)
所以它把所有的时间都花在了crc32c.py
。当我查看这段代码时,我得出结论,没有办法避免在生产者端调用这段代码。(很明显,如果你现在还不能说我不是 Kafka 专家,也不知道为什么需要这个检查)。
但似乎它这么慢的原因是它使用的是python版本crc32c
而不是本机版本。这就是我所希望的,至少……
所以问题是:
- 这是否表明这
crc32c
是问题所在? - 如何确保我使用的是本机
crc32c
? - 有没有人在使用
aiokafka
onpython:buster-3.9.x
之前看到过这个?