0

我想将 Spark Structured Streaming 作业中来自 Kafka 主题的消息读入数据帧。但是我在一个偏移量中获取整个消息,因此在数据帧中只有这条消息进入一行而不是多行。(在我的情况下是 3 行)

当我打印此消息时,我得到以下输出:

在此处输入图像描述

我想要在数据框中的 3 行中显示消息“Text1”、“Text2”和“Text3”,以便我可以进一步处理。

请帮我。

4

1 回答 1

1

您可以使用用户定义函数(UDF) 将消息字符串转换为字符串序列,然后在该列上应用explode函数,为序列中的每个元素创建一个新行:

如下图所示(在 scala 中,同样的原理也适用于 pyspark):

case class KafkaMessage(offset: Long, message: String)

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode

val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()

val splitString = udf { s: String => s.split('\n') }

df.withColumn("splitMsg", explode(splitString($"message")))
  .select("offset", "splitMsg")
  .show()

这将产生以下输出:

+------+--------+
|offset|splitMsg|
+------+--------+
|  1000|   Text1|
|  1000|   Text2|
|  1000|   Text3|
+------+--------+
于 2019-02-22T19:28:29.707 回答