我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。然后我尝试使用 PySpark 在 Spark Structured Streaming 中读取这些消息,如下所示:
events_df = select(from_json(col("value").cast("string"), schema).alias("value"))
但此代码仅适用于单个 json 文档。如果该值包含多条记录作为换行符分隔的 json,Spark 无法正确解码。
我不想为每个事件发送 kafka 消息。我怎样才能做到这一点?