我有两个数据框,left
并且right
. 后者 ,right
是 , 的子集left
,left
包含所有行right
。我想通过做一个简单的“left_anti”连接right
来删除多余的行。left
left
我发现如果我使用右侧的过滤版本,则连接不起作用。只有当我从头开始重建正确的数据帧时,它才有效。
- 这里发生了什么?
- 是否有不涉及重新创建正确数据框的解决方法?
from pyspark.sql import Row, SparkSession
import pyspark.sql.types as t
schema = t.StructType(
[
t.StructField("street_number", t.IntegerType()),
t.StructField("street_name", t.StringType()),
t.StructField("lower_street_number", t.IntegerType()),
t.StructField("upper_street_number", t.IntegerType()),
]
)
data = [
# Row that conflicts w/ range row, and should be removed
Row(
street_number=123,
street_name="Main St",
lower_street_number=None,
upper_street_number=None,
),
# Range row
Row(
street_number=None,
street_name="Main St",
lower_street_number=120,
upper_street_number=130,
),
]
def join_files(left_side, right_side):
join_condition = [
(
(right_side.lower_street_number.isNotNull())
& (right_side.upper_street_number.isNotNull())
& (right_side.lower_street_number <= left_side.street_number)
& (right_side.upper_street_number >= left_side.street_number)
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)
right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected