我正在尝试对 Apache Kafka 进行基准测试,目前我正在尝试测量延迟。为了做到这一点,我正在生成 10 MB 的推文数据(约 14500 条推文),以免失去对正在发生的事情的概述,但我基本上可以在很久以后任意发送。因此,这 10 MB 目前仅用于测试目的。
我已将代理配置为使用LogAppendTime
时间戳,我将其与消费者端的当前时间进行比较。这样做时,我总是得到随时间增加的延迟值(请参阅下面的详细信息),这不是我所期望的。
设置
- 带有 Ubuntu 20 的虚拟机
- 2 个 VPU
- 9GB 内存
confluent-kafka
Apache Kafka
在最新版本中- 1个Partition,1个Kafka Broker,所有默认设置(设置除外
LogAppendTime
)
生产者和消费者都在同一台机器上运行(至少现在是这样)。虚拟机在数据中心运行,所以带宽应该没问题。
常数
CONSUMER_GROUP_ID = "twitter-consumers"
KAFKA_TOPIC_TWITTER = "twitter-stream"
生产者代码
producer_config = {
"bootstrap.servers": "localhost:9092"
}
p = Producer(producer_config)
generation_steps = 1 # 10 MBs per step
with open(TWITTER_DATA_PATH, "r") as dataset:
for step in range(generation_steps):
print(f"Executing data generation step {step}...")
dataset.seek(0) # Jump back to first line
for tweet in dataset:
try:
print("IN QUEUE: {}".format(len(p)))
p.produce(KAFKA_TOPIC_TWITTER, value=tweet)
p.poll(0)
except BufferError:
print('[INFO] Local producer queue is full (%d messages awaiting delivery): Trying again after flushing...\n' % len(p))
p.flush() # Send the messages in the queue
# The tweet that caused the BufferError could not be sent, so after flushing, we can now send it
p.produce(KAFKA_TOPIC_TWITTER, value=tweet)
p.poll(0)
print("Data generation done!")
消费者守则
# Consume the data
consumer_config = {
"bootstrap.servers": "localhost:9092",
"group.id": CONSUMER_GROUP_ID,
"client.id": "client-1",
"enable.auto.commit": True,
"session.timeout.ms": 6000,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
c = Consumer(consumer_config)
c.subscribe([KAFKA_TOPIC_TWITTER])
msg_count = 0
try:
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
delta = (time.time() * 1000) - msg.timestamp()[1]
msg_count += 1
print("Message {} received | Timestamp: {} | Latency: {}".format(msg_count, msg.timestamp()[1], delta))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached {}/{}".format(msg.topic(), msg.partition()))
else:
print("Error occured: {}".format(msg.error().str()))
except KeyBoardInterrupt:
pass
finally:
c.close()
我得到什么
消费端
Message 1 received | Timestamp: 1618685061270 | Latency: 12.360107421875
Message 2 received | Timestamp: 1618685061270 | Latency: 13.025390625
Message 3 received | Timestamp: 1618685061270 | Latency: 13.08544921875
Message 4 received | Timestamp: 1618685061270 | Latency: 13.156005859375
Message 5 received | Timestamp: 1618685061270 | Latency: 13.1923828125
Message 6 received | Timestamp: 1618685061270 | Latency: 13.21875
Message 7 received | Timestamp: 1618685061270 | Latency: 13.245849609375
Message 8 received | Timestamp: 1618685061270 | Latency: 13.29833984375
Message 9 received | Timestamp: 1618685061270 | Latency: 13.348388671875
Message 10 received | Timestamp: 1618685061270 | Latency: 13.389892578125
Message 11 received | Timestamp: 1618685061270 | Latency: 13.43701171875
Message 12 received | Timestamp: 1618685061270 | Latency: 13.49755859375
Message 13 received | Timestamp: 1618685061270 | Latency: 13.54345703125
Message 14 received | Timestamp: 1618685061270 | Latency: 13.581787109375
Message 15 received | Timestamp: 1618685061270 | Latency: 13.626708984375
Message 16 received | Timestamp: 1618685061270 | Latency: 13.661376953125
Message 17 received | Timestamp: 1618685061270 | Latency: 13.695556640625
Message 18 received | Timestamp: 1618685061270 | Latency: 13.720703125
Message 19 received | Timestamp: 1618685061270 | Latency: 13.75537109375
Message 20 received | Timestamp: 1618685061270 | Latency: 13.78857421875
Message 21 received | Timestamp: 1618685061270 | Latency: 13.81396484375
[....]
Message 14485 received | Timestamp: 1618685061337 | Latency: 1061.004150390625
制作方
Executing data generation step 0...
IN QUEUE: 0
IN QUEUE: 1
IN QUEUE: 2
IN QUEUE: 3
IN QUEUE: 4
IN QUEUE: 5
IN QUEUE: 6
IN QUEUE: 7
IN QUEUE: 8
IN QUEUE: 9
IN QUEUE: 10
IN QUEUE: 11
IN QUEUE: 12
IN QUEUE: 13
IN QUEUE: 14
IN QUEUE: 15
IN QUEUE: 16
[...]
IN QUEUE: 14484
Data generation done!
我会期待什么
如您所见,我正在使用delta = (time.time() * 1000) - msg.timestamp()[1]
. 我希望延迟最多在 10 到 100 毫秒之间变化。我不明白为什么延迟会随着时间“线性”增加。
另外,我不明白为什么buffer queue
会变得如此之大,.poll()
因为据我所知,Kafka 应该在每次通话后清除队列。
有谁知道这可能是什么原因以及我能做些什么?