我正在阅读来自 kafka 主题的消息
messageDFRaw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "test-message")\
.load()
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")
当我从上述查询打印数据框时,我得到以下控制台输出。
|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|
如何从 DataStreamReader 创建一个数据框,以便我有一个包含列的数据框|key|channel| username| message|
我尝试按照 如何使用结构化流从 Kafka 读取 JSON 格式的记录中接受的答案?
struct = StructType([
StructField("channel", StringType()),
StructField("username", StringType()),
StructField("message", StringType()),
])
messageDFRaw.select(from_json("CAST(value AS STRING)", struct))
但是,我进去Expected type 'StructField', got 'StructType' instead
了from_json()