4

这是我的加入:

df = df_small.join(df_big, 'id', 'leftanti')

看来我只能广播正确的数据帧。但是为了让我的逻辑起作用(leftanti join),我必须df_small在左侧。

如何广播左侧的数据帧?


例子:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()

df_small = spark.range(2)
df_big = spark.range(1, 5000000)

#    df_small     df_big
#    +---+        +-------+
#    | id|        |     id|
#    +---+        +-------+
#    |  0|        |      1|
#    |  1|        |      2|
#    +---+        |    ...|
#                 |4999999|
#                 +-------+

df_small = F.broadcast(df_small)
df = df_small.join(df_big, 'id', 'leftanti')
df.show()
df.explain()

#    +---+
#    | id|
#    +---+
#    |  0|
#    +---+
#
#    == Physical Plan ==
#    AdaptiveSparkPlan isFinalPlan=false
#    +- SortMergeJoin [id#197L], [id#199L], LeftAnti
#       :- Sort [id#197L ASC NULLS FIRST], false, 0
#       :  +- Exchange hashpartitioning(id#197L, 200), ENSURE_REQUIREMENTS, [id=#1406]
#       :     +- Range (0, 2, step=1, splits=2)
#       +- Sort [id#199L ASC NULLS FIRST], false, 0
#          +- Exchange hashpartitioning(id#199L, 200), ENSURE_REQUIREMENTS, [id=#1407]
#             +- Range (1, 5000000, step=1, splits=2)
4

2 回答 2

2

不幸的是,这是不可能的。

Spark 只能为右外连接广播左侧表。

您可以通过将左反分为 2 个连接即内连接和左连接来获得所需的结果。

df1 = spark.createDataFrame([1, 2, 3, 4, 5], IntegerType())
df2 = spark.createDataFrame([(1, 'a'), (2, 'b')], ['value', 'col'])
inner = df1.join(broadcast(df2), 'value', 'inner')
out = df1.join(broadcast(inner), 'value', 'left').where(col('col').isNull()).drop('col')
out.show()
+-----+
|value|
+-----+
|    3|
|    4|
|    5|
+-----+

df1.join(df2, 'value', 'left_anti').show()
+-----+
|value|
+-----+
|    5|
|    3|
|    4|
+-----+
于 2021-08-23T16:56:59.260 回答
0

基于这个想法,我创建了一个函数。

功能:

df_inner = df_big.join(broadcast(df_small), 'id', 'inner').select('id').distinct()
df_out = df_small.join(broadcast(df_inner), 'id', 'leftanti')

具有以下功能:

def leftanti_on_small(df_small, df_big, on):
    df_inner = df_big.join(broadcast(df_small), on, 'inner').select(on).distinct()
    return df_small.join(broadcast(df_inner), on, 'leftanti')

df_out = leftanti_on_small(df_small, df_big, on='id')

结果:

df_out.show()
df_out.explain()

#  +---+
#  | id|
#  +---+
#  |  0|
#  +---+
#  
#  == Physical Plan ==
#  AdaptiveSparkPlan isFinalPlan=false
#  +- BroadcastHashJoin [id#209L], [id#211L], LeftAnti, BuildRight, false
#     :- Range (0, 2, step=1, splits=2)
#     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1587]
#        +- HashAggregate(keys=[id#211L], functions=[])
#           +- HashAggregate(keys=[id#211L], functions=[])
#              +- Project [id#211L]
#                 +- BroadcastHashJoin [id#211L], [id#217L], Inner, BuildRight, false
#                    :- Range (1, 5000000, step=1, splits=2)
#                    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1581]
#                       +- Range (0, 2, step=1, splits=2)
于 2021-08-24T11:53:13.727 回答