0

我有这样的工作代码:

sc = SparkContext()
glueContext = GlueContext(sc)
s3_paths = ['01', '02', '03'] #these paths are in the same folder and are partitioned under the source_path
s3_source_path = 'bucket_name/'
for sub_path in s3_paths :
    s3_path = s3_source_path + '/' sub_path
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    # get data from s3 path
    job_DyF = glueContext.create_dynamic_frame.from_options('s3', {"paths": [path], "recurse": True}, "json", format_options={"jsonPath": "$[*]"}, transformation_ctx = "job_DyF")
    
    # write dataset to s3 avro
    data_sink = glueContext.write_dynamic_frame.from_options(frame = df_verify_filtered, connection_type = "s3", connection_options = {"path": "s3://target", "partitionKeys": ["partition_0", "partition_1", "partition_2"]}, format = "avro", transformation_ctx = "data_sink")
    
    job.commit()

作业成功后,某些 sub_paths 中缺少记录。

当我尝试再次运行该作业时,它显示 no new file detected.

因此,我尝试使用特定的 sub_path 运行代码,但for sub_path in paths奇怪的是,当为 sub_path #2 运行作业时会出现问题:

它说no new file detectedsub_path '02',

即使该作业仅针对第一个 sub_path '01' 运行,并且只有来自第一个 sub_path 的数据被摄取到 S3 avro。

我无法弄清楚我设置此书签的方式有什么问题,因此非常感谢您的见解!提前致谢。

4

0 回答 0