0

我有两个数据框,left并且right. 后者 ,right是 , 的子集leftleft包含所有行right。我想通过做一个简单的“left_anti”连接right来删除多余的行。left

left我发现如果我使用右侧的过滤版本,则连接不起作用。只有当我从头开始重建正确的数据帧时,它才有效。

  • 这里发生了什么?
  • 是否有不涉及重新创建正确数据框的解决方法?
from pyspark.sql import Row, SparkSession

import pyspark.sql.types as t

schema = t.StructType(
    [
        t.StructField("street_number", t.IntegerType()),
        t.StructField("street_name", t.StringType()),
        t.StructField("lower_street_number", t.IntegerType()),
        t.StructField("upper_street_number", t.IntegerType()),
    ]
)
data =  [
    # Row that conflicts w/ range row, and should be removed
    Row(
        street_number=123,
        street_name="Main St",
        lower_street_number=None,
        upper_street_number=None,
    ),
    # Range row
    Row(
        street_number=None,
        street_name="Main St",
        lower_street_number=120,
        upper_street_number=130,
    ),
]


def join_files(left_side, right_side):
    join_condition = [
      (
        (right_side.lower_street_number.isNotNull())
        & (right_side.upper_street_number.isNotNull())
        & (right_side.lower_street_number <= left_side.street_number)
        & (right_side.upper_street_number >= left_side.street_number)
      )
    ]
    return left_side.join(right_side, join_condition, "left_anti")


spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)

right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present


right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
4

1 回答 1

1

您可以给 DF 起别名:


import pyspark.sql.functions as F


def join_files(left_side, right_side):
    join_condition = [
      (
        (F.col("right_side.lower_street_number").isNotNull())
        & (F.col("right_side.upper_street_number").isNotNull())
        & (F.col("right_side.lower_street_number") <= F.col("left_side.street_number"))
        & (F.col("right_side.upper_street_number") >= F.col("left_side.street_number"))
      )
    ]
    return left_side.join(right_side, join_condition, "left_anti")


spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema).alias("left_side")


right_fail = left.filter("lower_street_number IS NOT NULL").alias("right_side")
result = join_files(left, right_fail)
print(result.count()) # Returns 2 - both rows still present


right_success = spark.createDataFrame([data[1]], schema).alias("right_side")
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected

不知道您使用的是哪个 pyspark 版本,但是pyspark==3.0.1,我收到以下解释性错误。

AnalysisException: Column lower_street_number#522, upper_street_number#523, lower_street_number#522, upper_street_number#523 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.;

于 2021-10-14T07:18:54.470 回答