1

我有一个df:

col1 col2
1 abcdefghi
2 qwertyuio

我想重复每一行,将 col2 分成 3 个长度为 3 的子字符串:

col1 col2
1 abcdefghi
1 美国广播公司
1 定义
1
2 qwertyuio
2 qwe
2 rty
2 uio

我试图创建一个新的Seqcontainsng列Seq((col("col1"), substring(col("col2"),0,3))...)

    val df1 = df.withColumn("col3", Seq(
(col("col1"), substring(col("col2"),0,3)),
(col("col1"), substring(col("col2"),3,3)),
(col("col1"), substring(col("col2"),6,3)) ))

我的想法是选择那个新列,并减少它,得到一个最终的 Seq。然后将其传递给 DF 并将其附加到初始 df。

我在 withColumn 中遇到错误,例如:

Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.$colon$colon
4

2 回答 2

1

您可以改用 Spark 数组函数:

val df1 = df.union(
    df.select(
        $"col1",
        explode(array(
            substring(col("col2"),0,3),
            substring(col("col2"),3,3),
            substring(col("col2"),6,3)
       )).as("col2")
    )
)

df1.show
+----+---------+
|col1|     col2|
+----+---------+
|   1|abcdefghi|
|   2|qwertyuio|
|   1|      abc|
|   1|      cde|
|   1|      fgh|
|   2|      qwe|
|   2|      ert|
|   2|      yui|
+----+---------+
于 2021-01-15T08:04:50.457 回答
1

你也可以使用 udf,

val df = spark.sparkContext.parallelize(Seq((1L,"abcdefghi"), (2L,"qwertyuio"))).toDF("col1","col2")
df.show(false)
// input
+----+---------+
|col1|col2     |
+----+---------+
|1   |abcdefghi|
|2   |qwertyuio|
+----+---------+
 
// udf
val getSeq = udf((col2: String) => col2.split("(?<=\\G...)"))
df.withColumn("col2", explode(getSeq($"col2")))
  .union(df).show(false)

+----+---------+
|col1|col2     |
+----+---------+
|1   |abc      |
|1   |ghi      |
|1   |abcdefghi|
|1   |def      |
|2   |qwe      |
|2   |rty      |
|2   |uio      |
|2   |qwertyuio|
+----+---------+
于 2021-01-15T08:13:10.713 回答