1

我有两个数据框,df1如下df2所示:

df1.show()
+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   0    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+


df2.show()
+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | a   | b  |   ?    |
|  C|   ghi  | a   | c  |   ?    |
+---+--------+-----+----+--------+

如果在中引用该行,我想更新c5df1并将其设置为。每条记录由和列标识。1df2c1c2

以下是所需的输出;请注意,c5第一条记录的值已更新为1

+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   1    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+
4

1 回答 1

2

将 df2 左连接到 df1 并使用case when ..表达式 for c5

from pyspark.sql.functions import when,*
joined_dfs = df1.join(df2,(df1.c1 == df2.c1) & (df1.c2 == df2.c2),'left').select('df1.*')
joined_dfs.select(joined_dfs.c1,joined_dfs.c2,joined_dfs.c3,joined_dfs.c4) \
          .withColumn('c5',when((joined_dfs.c1.isNotNull()) & (joined_dfs.c2.isNotNull()),1).otherwise(0)) \ 
          .show()
于 2019-07-12T14:29:28.287 回答