0

在 Ubuntu 服务器上,我设置了 Divolte Collector 以从网站收集点击流数据。数据正在写入名为 divolte-data 的 Kafka 通道。通过设置 Kafka 消费者,我可以看到传入的数据:

V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer
LinuxCanonical Ltd.

然后我想用 Airbnb Superset 可视化数据,它有几个连接到常见数据库的连接器,包括 druid.io(可以读取 Spark)。

似乎 Divolte 以非结构化方式将数据存储在 Kafka 中。但显然它可以以结构化的方式映射数据。输入数据是否应该用 JSON 结构化(就像文档说的那样)?

然后如何从 Druid-Tranquility 读取 divolte-data Kafka 通道接收到的数据?我尝试在 conf 示例中更改通道名称,但此使用者随后收到零消息。

4

1 回答 1

0

我找到的解决方案是我可以在 Python 中处理 Kafka 消息,例如使用 Kafka Python 库或 Confluent Kafka Python,然后我将使用 Avro 阅读器对消息进行解码。

编辑:这是我如何做到的更新:

我以为 Avro 库只是为了读取 Avro 文件,但它实际上解决了解码 Kafka 消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数将消息解码为字典,我可以在消费者循环中使用。

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
于 2017-06-07T05:56:33.290 回答