0

我正在尝试迁移一些 udf 函数以提高某些进程对 pandas udf 的性能,但我不知道如何处理异常。

原始功能

def json_prepare(row: str) -> str:
col1 = row[0]
col2 = row[1]
    try:
        return json.dumps(adap_treatment(json.loads(col1), col2))
    except json.JSONDecodeError:
        return None

udf_prepare = F.udf(json_prepare, StringType())
df = df.withColumn('new_col', udf_prepare(F.struct(F.col('col1'), F.col('col2'))))

我的尝试之一

def json_prepare(col1: pd.Series, col2: pd.Series) -> pd.Series:
    try:
        return pd.Series([json.dumps(adap_treatment(json.loads(c1), c2)) for c1, c2 in zip(col1, col2)])
    except json.JSONDecodeError:
        return None

udf_prepare = F.pandas_udf(json_prepare, StringType())
df = df.withColumn('new_col', udf_prepare(F.col('col1'), F.col('col2')))

try 子句有效,但发生异常时失败。我也尝试过(系列迭代器到系列迭代器),但我仍然收到错误

4

0 回答 0