我有这样的工作代码:
sc = SparkContext()
glueContext = GlueContext(sc)
s3_paths = ['01', '02', '03'] #these paths are in the same folder and are partitioned under the source_path
s3_source_path = 'bucket_name/'
for sub_path in s3_paths :
s3_path = s3_source_path + '/' sub_path
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# get data from s3 path
job_DyF = glueContext.create_dynamic_frame.from_options('s3', {"paths": [path], "recurse": True}, "json", format_options={"jsonPath": "$[*]"}, transformation_ctx = "job_DyF")
# write dataset to s3 avro
data_sink = glueContext.write_dynamic_frame.from_options(frame = df_verify_filtered, connection_type = "s3", connection_options = {"path": "s3://target", "partitionKeys": ["partition_0", "partition_1", "partition_2"]}, format = "avro", transformation_ctx = "data_sink")
job.commit()
作业成功后,某些 sub_paths 中缺少记录。
当我尝试再次运行该作业时,它显示 no new file detected
.
因此,我尝试使用特定的 sub_path 运行代码,但for sub_path in paths
奇怪的是,当为 sub_path #2 运行作业时会出现问题:
它说
no new file detected
sub_path '02',
即使该作业仅针对第一个 sub_path '01' 运行,并且只有来自第一个 sub_path 的数据被摄取到 S3 avro。
我无法弄清楚我设置此书签的方式有什么问题,因此非常感谢您的见解!提前致谢。