0

我有一些简单的代码来测试aiokafka库的性能。我正在使用一台 Windows 计算机,运行 Docker for Windows,以及一个 8 核的虚拟机。

在这种情况下,库aiokafka似乎实现了非常低的生产者吞吐量,大约每秒 1 MiB:

async def send_many():
  f = bigass_object_factory.BigAssObjectFactory()
  v = f.create_bytearray()
  print(f'{len(v)} length {type(v)} named v')

  producer = AIOKafkaProducer(
    bootstrap_servers='kafka:9092',
    loop=asyncio.get_event_loop(),
    acks=0,
  )

  pr = cProfile.Profile()
  pr.enable()

  await producer.start()

  total_sent = 0
  started = time.time()

  for i in range(100):
    await producer.send('test_topic_aiokafka', value=v)
    print(f'sent {i}')


  await producer.stop()

  pr.disable()
  s = io.StringIO()
  sortby = 'cumtime'
  ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
  ps.print_stats()
  print(s.getvalue())

def send_many_main():
  asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  asyncio.get_event_loop().run_until_complete(send_many())

p1 = Process(target=send_many_main)
p1.start()
p1.join()

这需要 60 多秒才能在我的计算机上运行,​​IE 需要 60 多秒才能发送 100 条 1 MiB 消息。我知道这对卡夫卡来说是一个重要的信息,但这很荒谬,我很确定这不应该是表演。当我使用我的个人资料时,cProfile我看到了这个

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      169    0.003    0.000   70.619    0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:260(_send_produce_req)
      169    0.008    0.000   70.615    0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:732(do)
      100    0.015    0.000   70.132    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:701(create_request)
      100    0.003    0.000   70.103    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:236(get_data_buffer)
      100    0.001    0.000   70.101    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:83(_build)
      100    0.000    0.000   70.099    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:526(build)
      100    0.119    0.001   70.099    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:482(write_header)
      100    0.000    0.000   69.968    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/util.py:115(calc_crc32c_py)
      100    0.001    0.000   69.968    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:130(crc)
      100   69.967    0.700   69.967    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:100(crc_update)
      169    0.004    0.000    0.473    0.003 /usr/local/lib/python3.9/site-packages/aiokafka/client.py:460(send)
      104    0.104    0.001    0.466    0.004 /usr/local/lib/python3.9/site-packages/aiokafka/conn.py:374(send)
      208    0.001    0.000    0.294    0.001 /usr/local/lib/python3.9/site-packages/kafka/util.py:155(__call__)
      208    0.004    0.000    0.293    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/struct.py:40(_encode_self)
  408/208    0.025    0.000    0.289    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:143(encode)
      107    0.002    0.000    0.270    0.003 /opt/project/tests/contrib/aiokafka/test_aiokafka_performance.py:34(send_many)
  408/208    0.003    0.000    0.237    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:146(<listcomp>)
  202/102    0.002    0.000    0.233    0.002 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:181(encode)

所以它把所有的时间都花在了crc32c.py。当我查看这段代码时,我得出结论,没有办法避免在生产者端调用这段代码。(很明显,如果你现在还不能说我不是 Kafka 专家,也不知道为什么需要这个检查)。

但似乎它这么慢的原因是它使用的是python版本crc32c而不是本机版本。这就是我所希望的,至少……

所以问题是:

  1. 这是否表明这crc32c是问题所在?
  2. 如何确保我使用的是本机crc32c
  3. 有没有人在使用aiokafkaonpython:buster-3.9.x之前看到过这个?
4

0 回答 0