3

看看这个例子。它从 s3 目录中读取数据,然后写回 s3 文件夹。但是,如果我添加数据并重新运行该作业会怎样?我是对的,aws 胶水再次读取和写入所有数据?或者它只检测(如何?)新数据并只写它?

顺便说一句,如果我从分区数据中读取,我必须自己指定“新到”分区?

4

1 回答 1

1

从我在该示例中可以看到,他们正在从 S3 中的爬网位置读取数据,然后每次都替换一个文件,完全重新加载所有数据。

要仅处理新文件,您需要为您的作业启用书签,并确保您通过执行以下操作来提交作业:

args = getResolvedOptions(sys.argv, [‘TempDir’,’JOB_NAME’])
glue_context = GlueContext(SparkContext.getOrCreate()

# Instantiate your job object to later commit
job = Job(glue_context)
job.init(args[‘JOB_NAME’], args)

# Read file, if you enable Bookmark and commit at the end, this will only
# give you new files
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name)

result_dynamic_frame = # do some operations

# Append operation to create new parquet files from new data
result_dynamic_frame.toDF().write
  .mode("append")
  .parquet("s3://bucket/prefix/permit-inspections.parquet")

# Commit my job so next time we read, only new files will come in
job.commit()

希望这可以帮助

于 2018-01-16T16:23:40.967 回答