看看这个例子。它从 s3 目录中读取数据,然后写回 s3 文件夹。但是,如果我添加数据并重新运行该作业会怎样?我是对的,aws 胶水再次读取和写入所有数据?或者它只检测(如何?)新数据并只写它?
顺便说一句,如果我从分区数据中读取,我必须自己指定“新到”分区?
看看这个例子。它从 s3 目录中读取数据,然后写回 s3 文件夹。但是,如果我添加数据并重新运行该作业会怎样?我是对的,aws 胶水再次读取和写入所有数据?或者它只检测(如何?)新数据并只写它?
顺便说一句,如果我从分区数据中读取,我必须自己指定“新到”分区?
从我在该示例中可以看到,他们正在从 S3 中的爬网位置读取数据,然后每次都替换一个文件,完全重新加载所有数据。
要仅处理新文件,您需要为您的作业启用书签,并确保您通过执行以下操作来提交作业:
args = getResolvedOptions(sys.argv, [‘TempDir’,’JOB_NAME’])
glue_context = GlueContext(SparkContext.getOrCreate()
# Instantiate your job object to later commit
job = Job(glue_context)
job.init(args[‘JOB_NAME’], args)
# Read file, if you enable Bookmark and commit at the end, this will only
# give you new files
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name)
result_dynamic_frame = # do some operations
# Append operation to create new parquet files from new data
result_dynamic_frame.toDF().write
.mode("append")
.parquet("s3://bucket/prefix/permit-inspections.parquet")
# Commit my job so next time we read, only new files will come in
job.commit()
希望这可以帮助