2

我有一个运行我的PySpark测试的单元测试(使用PyTest ) 。我有正常的创建SQLContext。我想在所有情况下都得到相同的 uuid4,所以我在测试中修补了 uuid4。如果我从测试函数调用,一切都很好。conftest.pyuuid.uuid4()

但是,当我运行也调用 uuid4 的 PySpark 作业时,它没有被修补:

我的 PySpark 函数(简化):

def create_uuid_if_needed(current, prev):
    if current > prev:
        return str(uuid.uuid4())
    else:
        return None


def my_df_func(df):
    my_udf = udf(create_uuid_if_needed, T.StringType())    
    my_window = Window.partitionBy(F.col(PARTITIONING_KEY)).orderBy(F.col(ORDER))
    return df.withColumn('new_col', my_udf(df.col, F.lag(df.col, 1)).over(my_window))

我的测试如下所示:

@patch.object(uuid, 'uuid4', return_value='1-1-1-1')
def test_add_activity_period_start_id(mocker, sql_context, input_fixture):
    input_df = sql_context.createDataFrame(input_fixture, [... schema...])    
    good_uuid = str(uuid.uuid4())
    another_goood_uuid = create_uuid_if_needed(2, 1)
    actual_df = my_df_func(input_df)
    ...

得到正确的good_uuid值 - '1-1-1-1',another_good_uuid但数据帧的 udf 版本的函数仍然调用未修补的 uuid4。

这里有什么问题?它是udf()函数正在做的事情吗?谢谢!

4

0 回答 0