3

这个项目是为了real time search engine - log analysis表演。

我有一个从 Spark 处理到 Kafka 的实时流数据。

现在有了 Kafka 输出,我想get the data from the Kafka using Flask.. 和visualize it using Chartjs/或其他一些可视化..

如何从中获取实时流数据Kafka using the python flask

知道我该如何开始吗?

任何帮助将不胜感激!

谢谢!

4

2 回答 2

2

我会查看 python 的 Kafka 包:

http://kafka-python.readthedocs.org/en/master/usage.html

这应该可以让您设置从 Kafka 流式传输数据。此外,我可能会查看这个项目:https ://github.com/travel-intelligence/flasfka ,它与一起使用 Flask 和 Kafka 有关(刚刚在谷歌搜索中找到)。

于 2016-02-05T18:54:42.717 回答
1

我正在解决类似的问题(带有来自 Kafka 的实时流数据的小型 Flask 应用程序)。

你必须做几件事来设置它。首先,您需要一个KafkaConsumer来获取消息:

from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id='groupid', boostrap_servers=kafkakserver)
consumer.subscribe(topics=['topicid'])

try:
    # this method should auto-commit offsets as you consume them.
    # If it doesn't, turn on logging.DEBUG to see why it gets turned off.
    # Not assigning a group_id can be one cause
    for msg in consumer:
        # TODO: process the kafka messages.
finally:
    # Always close your producers/consumers when you're done
    consumer.close()

这是关于最基本的KafkaConsumer。for循环阻塞线程并循环,直到它提交最后一条消息。还有consumer.poll()一种方法可以在给定的时间内获取您可以获取的消息,具体取决于您希望如何构建数据流。Kafka 在设计时考虑了长期运行的消费者进程,但如果您正确提交消息,您也可以根据需要打开和关闭消费者。

现在你有了数据,所以你可以用 Flask 将它流式传输到浏览器。我对 ChartJS 不熟悉,但来自 Flask 的实时流式传输yield集中在调用一个在循环内结束的 Python 函数,而不仅仅是return在处理结束时的 a。

查看Michael Grinberg 的博客他对流媒体的跟进作为使用 Flask 进行流媒体的实际示例。(注意:任何实际在严肃的 Web 应用程序中流式传输视频的人都可能希望使用 ffmpy 将其编码为广泛使用的 H.264 之类的视频编解码器,并将其包装在 MPEG-DASH 中......或者可能选择一个可以执行更多操作的框架这些东西给你。)

于 2017-09-21T00:55:08.650 回答