我们正在制作一个实时产品,它接收图像并发回相关信息。出于可扩展性的目的,我们决定使用 Kafka 来平衡 Kubenertes 节点之间的工作负载。
前端 -> MainWorker -> Kafka (1) -> Worker -> Kafka (2) -> MainWorker -> 前端
由于某些原因,Kafka(1)的生产者和消费者之间有一个意外的 80-100 毫秒,对于 kafka(2)也是如此
云部署与本地部署的延迟相同
生产者 1
client = KafkaClient(hosts=kafka_url)
topic = client.topics['frames']
producer = topic.get_producer(min_queued_messages=1)
data = {"frame": message,"uid": self.user, "size": len(message), "time": str(start)}
data = json.dumps(data, separators=(',', ':'))
data = data.encode()
producer.produce(data)
生产者/消费者 2
consumer = topic.get_balanced_consumer(consumer_group='detection', zookeeper_connect=zookeeper_url, auto_commit_enable=True, reset_offset_on_start=True)
while True:
message = consumer.consume()
if message is None:
continue
start = time.time()
message = json.loads(message.value.decode('utf-8'))
before = datetime.datetime.strptime(message["time"], '%Y-%m-%d %H:%M:%S.%f')
after = datetime.datetime.now()
print(f"Bouncing 1: {int((after - before).total_seconds() * 1000)}ms")
...
producer.produce(response_json.encode('utf-8'))
我们尝试了一些方法来减少延迟,但这看起来不会影响延迟。生产者/消费者都需要具有最低的延迟。我们不关心吞吐量。