我正在尝试迁移一些 udf 函数以提高某些进程对 pandas udf 的性能,但我不知道如何处理异常。
原始功能
def json_prepare(row: str) -> str:
col1 = row[0]
col2 = row[1]
try:
return json.dumps(adap_treatment(json.loads(col1), col2))
except json.JSONDecodeError:
return None
udf_prepare = F.udf(json_prepare, StringType())
df = df.withColumn('new_col', udf_prepare(F.struct(F.col('col1'), F.col('col2'))))
我的尝试之一
def json_prepare(col1: pd.Series, col2: pd.Series) -> pd.Series:
try:
return pd.Series([json.dumps(adap_treatment(json.loads(c1), c2)) for c1, c2 in zip(col1, col2)])
except json.JSONDecodeError:
return None
udf_prepare = F.pandas_udf(json_prepare, StringType())
df = df.withColumn('new_col', udf_prepare(F.col('col1'), F.col('col2')))
try 子句有效,但发生异常时失败。我也尝试过(系列迭代器到系列迭代器),但我仍然收到错误