1

继续问题:pyspark dataframe withColumn command not working

我有一个输入数据框:df_input(更新的 df_input)

|comment|inp_col|inp_val|
|11     |a      |a1     |
|12     |a      |a2     |
|12     |f      |&a     |
|12     |a      |f9     |
|15     |b      |b3     |
|16     |b      |b4     |
|17     |c      |&b     |
|17     |c      |c5     |
|17     |d      |&c     |
|17     |d      |d6     |
|17     |e      |&d     |
|17     |e      |e7     |

如果您看到 inp_col 和 inp_val 具有层次结构,并且它可以是具有根值的 n 数。这里的父值是"b""a"

现在,根据我的要求,我必须将以“&”开头的子值替换为其相应的值。我尝试迭代以 inp_val 列中的“&”值开头的值列表,并在每次迭代中替换为值列表。但是,它没有奏效。我面临如何获取包含父子列表值的列表的问题。

试过的代码:

list_1 = [row['inp_val'] for row in tst.select(tst.inp_val).where(tst.inp_val.substr(0, 1) == '&').collect()]
# removing the '&' at every starting of the list values
list_2 = [list_val[1:] for list_val in list_1]
tst_1 = tst.withColumn("val_extract", when(tst.inp_val.substr(0, 1) == '&', regexp(tst.inp_val, "&", "")).otherwise(tst.inp_val))
for val in list_2:
   df_leaf = tst_1.select(tst_1.val_extract).where(tst_1.inp_col == val)
   list_3 = [row['val_extract'] for row in df_leaf.collect()]

   tst_1 = tst_1.withColumn('bool', when(tst_1.val_extract == val, 'True').otherwise('False'))
   tst_1 = tst_1.withColumn('val_extract', when(tst_1.bool == 'True', str(list_3)).otherwise(tst_1.val_extract)).drop('bool')

更新的预期输出:

|comment|inp_col|inp_val|inp_extract                  |
|11     |a      |a1     |['a1']                       |
|12     |a      |a2     |['a2']                       |
|12     |f      |&a     |['a1, 'a2']                  |
|12     |f      |f9     |['f9']                       |
|15     |b      |b3     |['b3']                       |
|16     |b      |b4     |['b4']                       |
|17     |c      |&b     |['b3', 'b4']                 |
|18     |c      |c5     |['c5']                       |
|19     |d      |&c     |['b3', 'b4', 'c5']           |
|20     |d      |d6     |['d6']                       |
|21     |e      |&d     |['b3', 'b4', 'c5', 'd6']     |
|22     |e      |e7     |['e7']                       |

之后我可以尝试做爆炸以获得多行。但是,aove 输出是我们需要的,并且无法获得一定的百分比结果。

4

2 回答 2

2

如果您真的想避免使用图表并且您的案例并不比上面显示的更复杂,请尝试此操作。

from pyspark.sql import functions as F

df.show() #sampledataframe

#+-------+---------+---------+
#|comment|input_col|input_val|
#+-------+---------+---------+
#|     11|        a|       a1|
#|     12|        a|       a2|
#|     12|        f|       &a|
#|     12|        f|       f9|
#|     15|        b|       b3|
#|     16|        b|       b4|
#|     17|        c|       &b|
#|     17|        c|       c5|
#|     17|        d|       &c|
#|     17|        d|       d6|
#|     17|        e|       &d|
#|     17|        e|       e7|
#+-------+---------+---------+

df1=df.join(df.groupBy("input_col").agg(F.collect_list("input_val").alias("y1"))\
          .withColumnRenamed("input_col","x1"),F.expr("""input_val rlike x1"""),'left')\
  .withColumn("new_col", F.when(F.expr("""substring(input_val,0,1)!""")!=F.lit('&'), F.array("input_val"))\
                    .otherwise(F.col("y1"))).drop("x1","y1")

df2=df1.join(df1.selectExpr("input_val as input_val1","new_col as new_col1"), F.expr("""array_contains(new_col,input_val1) and\
           substring(input_val1,0,1)=='&'"""),'left')


df2.join(df2.selectExpr("input_val1 as val2","new_col1 as col2")\
         .dropna(),F.expr("""array_contains(new_col1,val2)"""),'left')\
  .withColumn("inp_extract", F.when(F.expr("""substring(input_val,0,1)!='&'"""), F.col("new_col"))\
                        .otherwise(F.expr("""filter(concat(\
                        coalesce(new_col,array()),\
                        coalesce(new_col1,array()),\
                        coalesce(col2, array()))\
                        ,x-> x is not null and substring(x,0,1)!='&')""")))\

  .select("comment","input_col","input_val",F.array_sort("inp_extract").alias("inp_extract")).show()

#+-------+---------+---------+----------------+
#|comment|input_col|input_val|     inp_extract|
#+-------+---------+---------+----------------+
#|     11|        a|       a1|            [a1]|
#|     12|        a|       a2|            [a2]|
#|     12|        f|       &a|        [a1, a2]|
#|     12|        f|       f9|            [f9]|
#|     15|        b|       b3|            [b3]|
#|     16|        b|       b4|            [b4]|
#|     17|        c|       &b|        [b3, b4]|
#|     17|        c|       c5|            [c5]|
#|     17|        d|       &c|    [b3, b4, c5]|
#|     17|        d|       d6|            [d6]|
#|     17|        e|       &d|[b3, b4, c5, d6]|
#|     17|        e|       e7|            [e7]|
#+-------+---------+---------+----------------+
于 2020-07-10T22:27:29.583 回答
0

您可以将数据框连接到自身以获取此信息。

input : 
df.show()

+-------+-------+---------+
|comment|inp_col|input_val|
+-------+-------+---------+
|     11|      a|       a1|
|     12|      a|       a2|
|     13|      f|       &a|
|     14|      b|       b3|
|     15|      b|       b4|
|     16|      d|       &b|
+-------+-------+---------+

import pyspark.sql.functions as F


df.createOrReplaceTempView("df1")
df.withColumn("input_val", F.regexp_replace(F.col("input_val"), "&", "")).createOrReplaceTempView("df2")

spark.sql("""select * from (select coalesce(df2.comment,df1.comment) as comment , 
coalesce(df2.inp_col,df1.inp_col) as inp_col,
 coalesce(df2.input_val,df2.input_val) as input_val ,
 case when df1.input_val is not null then df1.input_val else df2.input_val end  as output
 from df1  full outer join df2 on df2.input_val = df1.inp_col) where input_val is not null order by comment  """).show()
Output
+-------+-------+---------+------+
|comment|inp_col|input_val|output|
+-------+-------+---------+------+
|     11|      a|       a1|    a1|
|     12|      a|       a2|    a2|
|     13|      f|        a|    a1|
|     13|      f|        a|    a2|
|     14|      b|       b3|    b3|
|     15|      b|       b4|    b4|
|     16|      d|        b|    b3|
|     16|      d|        b|    b4|
+-------+-------+---------+------+
于 2020-07-10T17:45:56.403 回答