1

我正在使用 Glue 读取 DynamoDB 表,由于动态模式,可能会发生某些列不存在的情况。使用以下代码添加它们可以正常工作,但如果我需要添加多个列,我不确定如何使函数动态化。

# add missing columns if not available
def AddCustRegName(r):
    r["customerRegistrationName"] = ""  # add column with empty string.
    return r

if addCustRegName:
    case_df_final = Map.apply(frame=case_df_final, f=AddCustRegName)

有什么建议么?

以下代码因以下错误而失败

# add missing columns if not available
def AddColumn(r, col):
    r[col] = ""  # add column with empty string.
    return r

case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded'))

case_df_final.toDF().printSchema()

无法执行第 6 行:case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded')) Traceback(最近一次调用最后):文件“/tmp/zeppelin_pyspark-4928209310219195923.py”,第 375 行,在 exec(code, _zcUserQueryNameSpace) File "", line 6, in File "", line 3, in AddColumn TypeError: 'DynamicFrame' object does not support item assignment

4

1 回答 1

0

你传入的函数Map只能有一个参数:

f– 应用于 DynamicFrame 中所有 DynamicRecords 的函数。该函数必须将 DynamicRecord 作为参数并返回由映射生成的新 DynamicRecord(必需)。

但是,您可以在 pyspark 数据框而不是 DynamicFrame 上执行此操作:

from pyspark.sql import functions as F

def AddColumn(sdf, new_col):
    return sdf.withColumn(new_col, F.lit(""))

case_sdf_final = AddColumn(case_df_final.toDF(), "accessoryTaxIncluded")

case_sdf_final.printSchema()

或者,如果您有要添加的列列表,您可以functools.reduce像这样使用:

import functools

new_cols = ["customerRegistrationName", "accessoryTaxIncluded"]

case_sdf_final = functools.reduce(
    lambda acc, c: AddColumn(acc, c),
    new_cols,
    case_df_final.toDF()
)

case_sdf_final.printSchema()

然后回到DynamicFrame:

case_df_final = DynamicFrame.fromDF(case_sdf_final, glueContext, "case_df_final")
于 2021-02-08T16:48:14.380 回答