0

TL;博士

  • 我正在尝试使用 Glue [Studio] 作业将许多 S3 数据文件合并为更少的数量
  • 输入数据在 Glue 中编目并可通过 Athena 查询
  • Glue 作业以“成功”输出状态运行,但未创建输出文件

细节

输入我有从刮板以每分钟一次的周期创建的数据。它将 JSON (gzip) 格式的输出转储到存储桶中。我在 Glue 中对这个存储桶进行了编目,并且可以使用 Athena 毫无错误地查询它。这让我更有信心正确设置目录和数据结构。单独来说,这并不理想,因为它每天创建约 1.4K 文件,这使得对数据的查询(通过 Athena)非常慢,因为它们必须扫描太多、太小的文件

目标我想定期(可能每周一次,每月一次,我还不确定)将每分钟一次的文件合并到更少的文件中,以便查询扫描更大和更少的文件(更快的查询)。

方法我的计划是创建一个 Glue ETL 作业(使用 Glue Studio)从目录表中读取,并写入一个新的 S3 位置(保持相同的 JSON-gzip 格式,所以我可以将 Glue 表重新指向包含合并文件的新 S3 位置)。我使用 Glue Studio 设置了作业,当我运行它时它说成功,但是没有输出到指定的 S3 位置不是空文件,根本没有输出)。

卡住!我有点茫然,因为(1)它说它成功了,(2)我什至没有修改脚本(见下文),所以我认为(也许是个坏主意)它不是那。

日志我尝试通过 CloudWatch 日志查看它是否会有所帮助,但我并没有从中得到什么。我怀疑它可能与此条目有关,但我找不到确认或更改任何内容以“修复”它的方法。(路径肯定存在,我可以在 S3 中看到它,目录可以搜索它并由 Athena 查询验证,并且它是由 Glue Studio 脚本生成器自动生成的。)对我来说,这听起来像我我在某处选择了一个选项,使它认为我只想要对数据进行某种“增量”扫描。但我没有(有意地),也找不到任何能让我看起来有的地方。

CloudWatch 日志条目

21/03/13 17:59:39 WARN HadoopDataSource: Skipping Partition {} as no new files detected @ s3://my_bucket/my_folder/my_source_data/ or path does not exist

胶水脚本

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "my_database", table_name = "my_table", transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "DataSource0")
## @type: DataSink
## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://my_bucket/my_folder/consolidation/", "compression": "gzip", "partitionKeys": []}, transformation_ctx = "DataSink0"]
## @return: DataSink0
## @inputs: [frame = DataSource0]
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://my_bucket/my_folder/consolidation/", "compression": "gzip", "partitionKeys": []}, transformation_ctx = "DataSink0")
job.commit()

我首先研究的其他帖子

没有人有同样的问题,即“成功”的工作没有提供任何输出。但是,一个创建了空文件,而另一个创建了太多文件。最有趣的方法是使用 Athena 为您创建新的输出文件(使用外部表);但是,当我对此进行研究时,似乎输出格式选项没有 JSON-gzip(或没有 gzip 的 JSON),而只有 CSV 和 Parquet,我不喜欢使用它们。

如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet

AWS Glue:ETL 作业创建许多空输出文件

AWS Glue 作业 - 写入单个 Parquet 文件

AWS Glue,输出一个带分区的文件

4

1 回答 1

0

datasource_df = DataSource0.repartition(1)

DataSink0 = glueContext.write_dynamic_frame.from_options(frame = datasource_df, connection_type = "s3", format = "json", connection_options = {"path": "s3://my_bucket/my_folder/consolidation/", "compression": "gzip ", "partitionKeys": []}, transformation_ctx = "DataSink0")

工作提交()

于 2021-07-28T06:46:34.097 回答