1

我正在阅读来自 kafka 主题的消息

messageDFRaw = spark.readStream\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("subscribe", "test-message")\
                    .load()

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")

当我从上述查询打印数据框时,我得到以下控制台输出。

|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|

如何从 DataStreamReader 创建一个数据框,以便我有一个包含列的数据框|key|channel| username| message|

我尝试按照 如何使用结构化流从 Kafka 读取 JSON 格式的记录中接受的答案?

struct = StructType([
    StructField("channel", StringType()),
    StructField("username", StringType()),
    StructField("message", StringType()),
])

messageDFRaw.select(from_json("CAST(value AS STRING)", struct))

但是,我进去Expected type 'StructField', got 'StructType' insteadfrom_json()

4

1 回答 1

1

我忽略了Expected type 'StructField', got 'StructType' instead中的警告from_json()

但是,我最初必须从 kafka 消息中转换值,然后再解析 json 模式。

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))

messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")
于 2019-02-16T23:08:17.400 回答