我正在解决类似的问题(带有来自 Kafka 的实时流数据的小型 Flask 应用程序)。
你必须做几件事来设置它。首先,您需要一个KafkaConsumer来获取消息:
from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id='groupid', boostrap_servers=kafkakserver)
consumer.subscribe(topics=['topicid'])
try:
# this method should auto-commit offsets as you consume them.
# If it doesn't, turn on logging.DEBUG to see why it gets turned off.
# Not assigning a group_id can be one cause
for msg in consumer:
# TODO: process the kafka messages.
finally:
# Always close your producers/consumers when you're done
consumer.close()
这是关于最基本的KafkaConsumer。for
循环阻塞线程并循环,直到它提交最后一条消息。还有consumer.poll()
一种方法可以在给定的时间内获取您可以获取的消息,具体取决于您希望如何构建数据流。Kafka 在设计时考虑了长期运行的消费者进程,但如果您正确提交消息,您也可以根据需要打开和关闭消费者。
现在你有了数据,所以你可以用 Flask 将它流式传输到浏览器。我对 ChartJS 不熟悉,但来自 Flask 的实时流式传输yield
集中在调用一个在循环内结束的 Python 函数,而不仅仅是return
在处理结束时的 a。
查看Michael Grinberg 的博客和他对流媒体的跟进作为使用 Flask 进行流媒体的实际示例。(注意:任何实际在严肃的 Web 应用程序中流式传输视频的人都可能希望使用 ffmpy 将其编码为广泛使用的 H.264 之类的视频编解码器,并将其包装在 MPEG-DASH 中......或者可能选择一个可以执行更多操作的框架这些东西给你。)