我有一个运行我的PySpark测试的单元测试(使用PyTest ) 。我有正常的创建SQLContext。我想在所有情况下都得到相同的 uuid4,所以我在测试中修补了 uuid4。如果我从测试函数调用,一切都很好。conftest.py
uuid.uuid4()
但是,当我运行也调用 uuid4 的 PySpark 作业时,它没有被修补:
我的 PySpark 函数(简化):
def create_uuid_if_needed(current, prev):
if current > prev:
return str(uuid.uuid4())
else:
return None
def my_df_func(df):
my_udf = udf(create_uuid_if_needed, T.StringType())
my_window = Window.partitionBy(F.col(PARTITIONING_KEY)).orderBy(F.col(ORDER))
return df.withColumn('new_col', my_udf(df.col, F.lag(df.col, 1)).over(my_window))
我的测试如下所示:
@patch.object(uuid, 'uuid4', return_value='1-1-1-1')
def test_add_activity_period_start_id(mocker, sql_context, input_fixture):
input_df = sql_context.createDataFrame(input_fixture, [... schema...])
good_uuid = str(uuid.uuid4())
another_goood_uuid = create_uuid_if_needed(2, 1)
actual_df = my_df_func(input_df)
...
得到正确的good_uuid
值 - '1-1-1-1',another_good_uuid
但数据帧的 udf 版本的函数仍然调用未修补的 uuid4。
这里有什么问题?它是udf()
函数正在做的事情吗?谢谢!