5

我有两个数据框,DF1 和 DF2。DF1 是主控,DF2 是增量。来自 DF2 的数据应插入 DF1 或用于更新 DF1 数据。

假设 DF1 具有以下格式:

证件号码 开始日期 数量
1 2016-01-01 4650 22
2 2016-01-02 3130 45
1 2016-01-03 4456 22
2 2016-01-15 1234 45

DF2 包含以下内容:

证件号码 开始日期 数量
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19

我需要组合这两个数据帧,如果 DF2 的“id_no”和“开始日期”与 DF1 匹配,则应在 DF1 中替换它,如果不匹配,则应将其插入 DF1。“id_no”不是唯一的。

预期结果:

证件号码 开始日期 数量
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-03 4456 22
2 2016-01-15 1234 45
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19
4

3 回答 3

7

id_no您可以在和上加入两个数据框start_date,然后coalesceamountdays列中加入df2来自第一个的列:

import pyspark.sql.functions as f

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    f.coalesce('b.amount', 'a.amount').alias('amount'), 
    f.coalesce('b.days', 'a.days').alias('days')
).show()

+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+

如果您有更多列:

cols = ['amount', 'days']

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    *(f.coalesce('b.' + col, 'a.' + col).alias(col) for col in cols)
).show()
+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+
于 2018-08-24T15:32:17.957 回答
0

我现在正在自己研究这个。看起来 spark支持 SQL 的 MERGE INTO应该对这个任务有好处。您只需要创建一个 new_id 是 id_no 和 start_date 的连接

MERGE INTO df1
USING df2
ON df1.new_id = df2.new_id
WHEN MATCHED THEN
  UPDATE SET df1.amount = df2.amount, df1.days = df2.days
WHEN NOT MATCHED
  THEN INSERT *
于 2021-11-02T16:43:29.063 回答
0

union如果两个 dfs 具有相同的结构,则应该这样做。

from pyspark.sql import functions as F
grp_by = {'id_no', 'start_date'}
df = df2.union(df1)
df = df.groupby(*grp_by).agg(*[F.first(c).alias(c) for c in set(df.columns)-grp_by])
df.show()
#     +-----+----------+----+------+
#     |id_no|start_date|days|amount|
#     +-----+----------+----+------+
#     |    1|2016-01-06|  20|  3456|
#     |    2|2016-01-20|  19|  2345|
#     |    1|2016-01-03|  22|  4456|
#     |    3|2016-02-02|  19|  1345|
#     |    2|2016-01-15|  45|  1234|
#     |    1|2016-01-01|  52|  8650|
#     |    2|2016-01-02|  65|  7130|
#     +-----+----------+----+------+
于 2021-03-15T11:01:33.290 回答