0

我正在尝试对 Apache Kafka 进行基准测试,目前我正在尝试测量延迟。为了做到这一点,我正在生成 10 MB 的推文数据(约 14500 条推文),以免失去对正在发生的事情的概述,但我基本上可以在很久以后任意发送。因此,这 10 MB 目前仅用于测试目的。

我已将代理配置为使用LogAppendTime时间戳,我将其与消费者端的当前时间进行比较。这样做时,我总是得到随时间增加的延迟值(请参阅下面的详细信息),这不是我所期望的。

设置

  • 带有 Ubuntu 20 的虚拟机
  • 2 个 VPU
  • 9GB 内存
  • confluent-kafkaApache Kafka在最新版本中
  • 1个Partition,1个Kafka Broker,所有默认设置(设置除外LogAppendTime

生产者和消费者都在同一台机器上运行(至少现在是这样)。虚拟机在数据中心运行,所以带宽应该没问题。

常数

CONSUMER_GROUP_ID = "twitter-consumers"
KAFKA_TOPIC_TWITTER = "twitter-stream"

生产者代码

producer_config = {
    "bootstrap.servers": "localhost:9092"
}
p = Producer(producer_config)

generation_steps = 1 # 10 MBs per step
with open(TWITTER_DATA_PATH, "r") as dataset:
    for step in range(generation_steps):
        print(f"Executing data generation step {step}...")
        dataset.seek(0) # Jump back to first line  
 
        for tweet in dataset:
            try:
                print("IN QUEUE: {}".format(len(p)))
                p.produce(KAFKA_TOPIC_TWITTER, value=tweet)
                p.poll(0)
            except BufferError:
                print('[INFO] Local producer queue is full (%d messages awaiting delivery): Trying again after flushing...\n' % len(p)) 
                p.flush() # Send the messages in the queue

                # The tweet that caused the BufferError could not be sent, so after flushing, we can now send it
                p.produce(KAFKA_TOPIC_TWITTER, value=tweet)
                p.poll(0)

print("Data generation done!")

消费者守则

# Consume the data
consumer_config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": CONSUMER_GROUP_ID,
    "client.id": "client-1",
    "enable.auto.commit": True,
    "session.timeout.ms": 6000,
    "default.topic.config": {"auto.offset.reset": "smallest"}
}
c = Consumer(consumer_config)

c.subscribe([KAFKA_TOPIC_TWITTER])

msg_count = 0
try:
    while True:
        msg = c.poll(0.1)
        
        if msg is None:
            continue
        elif not msg.error():
            delta = (time.time() * 1000) - msg.timestamp()[1]
            msg_count += 1
            print("Message {} received | Timestamp: {} | Latency: {}".format(msg_count, msg.timestamp()[1], delta))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print("End of partition reached {}/{}".format(msg.topic(), msg.partition()))
        else:
            print("Error occured: {}".format(msg.error().str()))
except KeyBoardInterrupt:
    pass
finally:
    c.close()

我得到什么

消费端

Message 1 received | Timestamp: 1618685061270 | Latency: 12.360107421875
Message 2 received | Timestamp: 1618685061270 | Latency: 13.025390625
Message 3 received | Timestamp: 1618685061270 | Latency: 13.08544921875
Message 4 received | Timestamp: 1618685061270 | Latency: 13.156005859375
Message 5 received | Timestamp: 1618685061270 | Latency: 13.1923828125
Message 6 received | Timestamp: 1618685061270 | Latency: 13.21875
Message 7 received | Timestamp: 1618685061270 | Latency: 13.245849609375
Message 8 received | Timestamp: 1618685061270 | Latency: 13.29833984375
Message 9 received | Timestamp: 1618685061270 | Latency: 13.348388671875
Message 10 received | Timestamp: 1618685061270 | Latency: 13.389892578125
Message 11 received | Timestamp: 1618685061270 | Latency: 13.43701171875
Message 12 received | Timestamp: 1618685061270 | Latency: 13.49755859375
Message 13 received | Timestamp: 1618685061270 | Latency: 13.54345703125
Message 14 received | Timestamp: 1618685061270 | Latency: 13.581787109375
Message 15 received | Timestamp: 1618685061270 | Latency: 13.626708984375
Message 16 received | Timestamp: 1618685061270 | Latency: 13.661376953125
Message 17 received | Timestamp: 1618685061270 | Latency: 13.695556640625
Message 18 received | Timestamp: 1618685061270 | Latency: 13.720703125
Message 19 received | Timestamp: 1618685061270 | Latency: 13.75537109375
Message 20 received | Timestamp: 1618685061270 | Latency: 13.78857421875
Message 21 received | Timestamp: 1618685061270 | Latency: 13.81396484375
[....]
Message 14485 received | Timestamp: 1618685061337 | Latency: 1061.004150390625

制作方

Executing data generation step 0...
IN QUEUE: 0
IN QUEUE: 1
IN QUEUE: 2
IN QUEUE: 3
IN QUEUE: 4
IN QUEUE: 5
IN QUEUE: 6
IN QUEUE: 7
IN QUEUE: 8
IN QUEUE: 9
IN QUEUE: 10
IN QUEUE: 11
IN QUEUE: 12
IN QUEUE: 13
IN QUEUE: 14
IN QUEUE: 15
IN QUEUE: 16
[...]
IN QUEUE: 14484
Data generation done!

我会期待什么

如您所见,我正在使用delta = (time.time() * 1000) - msg.timestamp()[1]. 我希望延迟最多在 10 到 100 毫秒之间变化。我不明白为什么延迟会随着时间“线性”增加。

另外,我不明白为什么buffer queue会变得如此之大,.poll()因为据我所知,Kafka 应该在每次通话后清除队列。

有谁知道这可能是什么原因以及我能做些什么?

4

1 回答 1

0

我找到了解决方案,这很明显:用 Python 打印需要很多时间,尤其是如果你做了成千上万次......

在将打印移出消费者循环并评估那里的延迟之后,它在 ~2 到 ~30ms 之间变化。

于 2021-04-18T13:18:42.740 回答