我正在从事一个个人项目,该项目需要创建一个 AWS Glue 作业,该作业将执行一些基本转换并将其移动到 DocumentDB 数据库。
我现在遇到的主要问题是我无法将数据移动到 DocumentDB 数据库。
我的胶水作业代码如下:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
documentdb_uri = "mongodb://mydocumentdb_uri"
documentdb_write_uri = "mongodb://mydocumentdb_uri"
read_docdb_options = {
"uri": documentdb_uri,
"database": "my_db_name",
"collection": "my_collection",
"username": "my_username",
"password": "my_password",
"ssl": "true",
"ssl.domain_match": "false",
}
write_documentdb_options = {
"uri": documentdb_write_uri,
"database": "my_db_name",
"collection": "my_collection",
"username": "my_username",
"password": "my_password"
}
## @type: DataSource
## @args: [database = "my-project-data-database", table_name = "my-project-data-table", redshift_tmp_dir = args["TempDir"], transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: []
S3SourceData = glueContext.create_dynamic_frame.from_catalog(database = "my-project-data-database", table_name = "my-project-data-table")
## @type: DataSink
## @args: [database = "my-project-data-database", table_name = "my-project-table-name", redshift_tmp_dir = "<redshift_tmp_dir>", transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: [frame = <frame>]
data_to_write_to_documentdb = glueContext.write_dynamic_frame.from_catalog(frame = S3SourceData, database = "my-project-data-database", table_name = "my-project-data-table")
## @type: DropFields
## @args: [paths = [<paths>], transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: [frame = <frame>]
Transformed_Example = DropFields.apply(frame = S3SourceData, paths = ['Multiple fields to drop here']
)
# Write the frame to DocumentDB
glueContext.write_dynamic_frame.from_options(Transformed_Example, connection_type="documentdb",
connection_options=write_documentdb_options)
job.commit()
这个项目仅用于学习目的,所以我不打算做一些主要的复杂 ETL。我只想从一个 S3 存储桶中获取数据,我已经用胶水爬虫抓取了这些数据,并删除了一些字段并将其移动到 DocumentDB。
我觉得为了将转换后的数据移动到 DocumentDB,我缺少一些基本的东西。但是,我无法弄清楚。我已经浏览了 Glue 和 DocumentDB 的文档,但我找不到我正在尝试做的事情的示例,或者我只是不理解给出的示例。
我已经做了将近 10 个小时了,所以如果有人能在这里帮助我,我将不胜感激。