1

我正在从事一个个人项目,该项目需要创建一个 AWS Glue 作业,该作业将执行一些基本转换并将其移动到 DocumentDB 数据库。

我现在遇到的主要问题是我无法将数据移动到 DocumentDB 数据库。

我的胶水作业代码如下:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


documentdb_uri = "mongodb://mydocumentdb_uri"
documentdb_write_uri = "mongodb://mydocumentdb_uri"


read_docdb_options = {
    "uri": documentdb_uri,
    "database": "my_db_name",
    "collection": "my_collection",
    "username": "my_username",
    "password": "my_password",
    "ssl": "true",
    "ssl.domain_match": "false",

}

write_documentdb_options = {
    "uri": documentdb_write_uri,
    "database": "my_db_name",
    "collection": "my_collection",
    "username": "my_username",
    "password": "my_password"
}

## @type: DataSource
## @args: [database = "my-project-data-database", table_name = "my-project-data-table", redshift_tmp_dir = args["TempDir"], transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: []
S3SourceData = glueContext.create_dynamic_frame.from_catalog(database = "my-project-data-database", table_name = "my-project-data-table")

## @type: DataSink
## @args: [database = "my-project-data-database", table_name = "my-project-table-name", redshift_tmp_dir = "<redshift_tmp_dir>", transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: [frame = <frame>]
data_to_write_to_documentdb = glueContext.write_dynamic_frame.from_catalog(frame = S3SourceData, database = "my-project-data-database", table_name = "my-project-data-table")


## @type: DropFields
## @args: [paths = [<paths>], transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: [frame = <frame>]
Transformed_Example = DropFields.apply(frame = S3SourceData, paths = ['Multiple fields to drop here']
)

# Write the frame to DocumentDB
glueContext.write_dynamic_frame.from_options(Transformed_Example, connection_type="documentdb",
                                             connection_options=write_documentdb_options)

job.commit()

这个项目仅用于学习目的,所以我不打算做一些主要的复杂 ETL。我只想从一个 S3 存储桶中获取数据,我已经用胶水爬虫抓取了这些数据,并删除了一些字段并将其移动到 DocumentDB。

我觉得为了将转换后的数据移动到 DocumentDB,我缺少一些基本的东西。但是,我无法弄清楚。我已经浏览了 Glue 和 DocumentDB 的文档,但我找不到我正在尝试做的事情的示例,或者我只是不理解给出的示例。

我已经做了将近 10 个小时了,所以如果有人能在这里帮助我,我将不胜感激。

4

0 回答 0