我对 DataLakes 比较陌生,我正在对 AWS 上的一个项目进行一些研究。
我创建了一个 DataLake 并从 Glue Crawlers 生成了表,我可以在 S3 中查看数据并使用 Athena 对其进行查询。到目前为止,一切都很好。
需要将存储在数据湖中的部分数据转换为 RDS,以便应用程序读取数据。从 S3 DataLake 到 RDS 的 ETL 最佳解决方案是什么?
我遇到的大多数帖子都在谈论从 RDS 到 S3 的 ETL,而不是相反。
我对 DataLakes 比较陌生,我正在对 AWS 上的一个项目进行一些研究。
我创建了一个 DataLake 并从 Glue Crawlers 生成了表,我可以在 S3 中查看数据并使用 Athena 对其进行查询。到目前为止,一切都很好。
需要将存储在数据湖中的部分数据转换为 RDS,以便应用程序读取数据。从 S3 DataLake 到 RDS 的 ETL 最佳解决方案是什么?
我遇到的大多数帖子都在谈论从 RDS 到 S3 的 ETL,而不是相反。
您可以通过胶水作业来实现这一点。示例代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
file_paths = ['path']
df = glueContext.create_dynamic_frame_from_options("s3", {'paths': file_paths}, format="csv", format_options={"separator": ",", "quoteChar": '"', "withHeader": True})
df.printSchema()
df.show(10)
options = {
'user': 'usr',
'password': 'pwd',
'url': 'url',
'dbtable': 'tabl'}
glueContext.write_from_options(frame_or_dfc=df, connection_type="mysql", connection_options=options)
一旦数据在 Spark DataFrame 的 Glue DataFrame 中,写出来就非常简单了。使用 RDBMS 作为数据接收器。
例如,要写入 Redshift DB,
// Write data to staging table in Redshift
glueContext.getJDBCSink(
catalogConnection = "redshift-glue-connections-test",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> staging,
"overwrite" -> "true",
"preactions" -> "<another SQL queries>",
"postactions" -> "<some SQL queries>"
)),
redshiftTmpDir = tempDir,
transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
如上所示,使用Connection
您创建的 JDBC 将数据写入。
通过使用 Spark 作业类型创建 Glue 作业,我能够将 S3 表用作数据源,并将 Aurora/MariaDB 用作目标。
尝试使用 python 作业类型进行相同操作时,我无法在 Glue 作业向导屏幕期间查看任何 S3 表。