0

我正在尝试使用 AWS Glue 将我的数据从 S3 存储桶 (address.csv) 传输到 AWS Aurora (MySQL)。当我使用以下脚本进行传输时,名为“po_box_number”的列之一是一个长度为 10 的 varchar,它给我一个错误,提示“调用 o195.pyWriteDynamicFrame 时发生错误。数据截断:列 'po_box_number' 的数据太长”在第 1 行"。当我出于诊断目的增加列的大小时,我看到数据以 json 格式存储。假设我需要的值是“100”,它存储为{“long”:100,“string”:null},同样如果我尝试存储“E101”,它存储为{“long”:null, “字符串”:“E101”}

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "db1", table_name = "tb1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "tb1", transformation_ctx = "datasource0")   

#applymapping1 = Map.apply(frame = datasource0, f = AddProcessedTime)

applymapping1 = ApplyMapping.apply(frame = applymapping1, mappings = [("col6", "string", "po_box_number", "string")], transformation_ctx = "applymapping1")

#applymapping1 = ResolveChoice.apply(applymapping1, specs = [("po_box_number", "cast:string")])
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = applymapping1, database = "db1", table_name = "tb2", transformation_ctx = "datasink5")
job.commit()
4

1 回答 1

0

我的 S3 存储桶中似乎有一些损坏的数据负责转换为 json。一旦我删除它,一切都按预期进行

于 2021-09-09T19:44:37.133 回答