1

我对 DataLakes 比较陌生,我正在对 AWS 上的一个项目进行一些研究。

我创建了一个 DataLake 并从 Glue Crawlers 生成了表,我可以在 S3 中查看数据并使用 Athena 对其进行查询。到目前为止,一切都很好。

需要将存储在数据湖中的部分数据转换为 RDS,以便应用程序读取数据。从 S3 DataLake 到 RDS 的 ETL 最佳解决方案是什么?

我遇到的大多数帖子都在谈论从 RDS 到 S3 的 ETL,而不是相反。

4

3 回答 3

1

您可以通过胶水作业来实现这一点。示例代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

file_paths = ['path']


df = glueContext.create_dynamic_frame_from_options("s3", {'paths': file_paths}, format="csv", format_options={"separator": ",", "quoteChar": '"', "withHeader": True})


df.printSchema()

df.show(10)

options = {
'user': 'usr',
'password': 'pwd', 
'url': 'url',
'dbtable': 'tabl'}


glueContext.write_from_options(frame_or_dfc=df, connection_type="mysql", connection_options=options)
于 2020-12-09T22:33:59.630 回答
1

一旦数据在 Spark DataFrame 的 Glue DataFrame 中,写出来就非常简单了。使用 RDBMS 作为数据接收器。

例如,要写入 Redshift DB,

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "preactions" -> "<another SQL queries>",
    "postactions" -> "<some SQL queries>"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)

如上所示,使用Connection您创建的 JDBC 将数据写入。

于 2020-01-13T19:54:38.237 回答
1

通过使用 Spark 作业类型创建 Glue 作业,我能够将 S3 表用作数据源,并将 Aurora/MariaDB 用作目标。

尝试使用 python 作业类型进行相同操作时,我无法在 Glue 作业向导屏幕期间查看任何 S3 表。

于 2019-11-08T15:39:02.127 回答