0

我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。然后我尝试使用 PySpark 在 Spark Structured Streaming 中读取这些消息,如下所示:

events_df = select(from_json(col("value").cast("string"), schema).alias("value"))

但此代码仅适用于单个 json 文档。如果该值包含多条记录作为换行符分隔的 json,Spark 无法正确解码。

我不想为每个事件发送 kafka 消息。我怎样才能做到这一点?

4

1 回答 1

0

我设法以这种方式完成了我正在寻找的事情,用换行符拆分全文字符串,然后将数组分解成行以使用模式进行解析:

    events = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "quickstart-events") \
        .option("startingOffsets", "earliest")\
        .load()\
        .selectExpr("CAST(value AS STRING) as data")
    
    events = events.select(explode(split(events.data, '\n')))
    events = events.select(from_json(col("col"), event_schema).alias('value'))
    events = events.selectExpr('value.*')```
于 2021-02-03T08:55:16.090 回答