0

参与挑战,它说:您的第一步 - 使用来自 Apache Kafka 的数据样本。 所以他们给了我主题名称API_KEYAPI_SECRET。哦,还有引导服务器。然后他们声称好像您不熟悉 Kafka,Confluent 提供了全面的文档。好吧,登录到 confluent,创建一个集群,然后.. 消费数据的下一步是什么?

4

1 回答 1

1

这是将来自 Kafka 的消息放入 Python 列表的基本模式。

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'someTopicName',
     bootstrap_servers=['192.168.1.160:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)

messageCache = []

for message in consumer:
    messageCache.append(message.value)

在这种情况下,我的 Kafka 代理在我的私​​有 LAN 上,使用默认端口,所以我的引导服务器列表只是 ["192.168.1.160:9092"]。

您可以使用标准计数器和 if 语句将列表保存到文件或其他任何内容,因为假定 Kafka 流将永远运行。例如,我有一个进程使用 Kafka 消息并将它们作为 parquet 中的数据帧保存到 HDFS,每 1,000,000 条消息。在这种情况下,我想保存历史消息以开发 ML 模型。Kafka 的伟大之处在于我可以编写另一个流程来实时评估并可能响应每条消息。

于 2020-11-04T20:21:16.193 回答