我对 AWS 还很陌生,目前正在探索它。我希望就实施这项工作的最佳方式获得见解或建议。
我想从多个 mysql 表中获取数据。
- 用户交易
- 用户贷款
- 促销优惠
为了获得决赛桌,我找到了以下两种方法。
方法一:
- 然后为每个表创建目录,
user_transaction = glueContext.create_dynamic_frame.from_catalog(
database="Org_Data_Lake",
table_name="user_transaction"
transformation_ctx = "user_transaction",
additional_options = {"jobBookmarkKeys":["transaction_id"],"jobBookmarksKeysSortOrder":"asc"})
user_loans = glueContext.create_dynamic_frame.from_catalog(
database="Org_Data_Lake",
table_name="user_loans")
promo_offers = glueContext.create_dynamic_frame.from_catalog(
database="Org_Data_Lake",
table_name="promo_offers")
- 然后申请 Join.apply 获得最终
final_history = Join.apply(user_transaction,
Join.apply(user_loans, promo_offers, 'offer_id', 'offer_id'),
'user_loan_id', 'user_loan_id').drop_fields([.......])
- 最后,将所有数据放入S3
glueContext.write_dynamic_frame.from_options(frame = final_history,
connection_type = "s3",
connection_options = {"path": "s3://glue-sample-target/output-dir/final_history"},
format = "parquet")
方法二:
- 立即准备好 final_history 数据,
query = "t1.transaction_id, t1.status, t2.loan_status, t3.offer_amount
FROM user_transaction AS t1
JOIN user_loans AS t2 ON (t2.user_loan_id = t1.user_loan_id)
JOIN promo_offers AS t3 ON (t3.offer_id = t2.offer_id)
WHERE t1.created_at > '2020-01-01 00:00:00' LIMIT 10) as tmp"
final_history_data = glueContext.read.format("jdbc")
.option("driver", jdbc_driver_name)
.option("url", db_url)
.option("dbtable", query)
.option("user", db_username)
.option("password", db_password).load()
final_history = DynamicFrame.fromDF(final_history_data, glueContext, "final_history")
- 最后将所有数据放入S3
glueContext.write_dynamic_frame.from_options(frame = final_history,
connection_type = "s3",
connection_options = {"path": "s3://glue-sample-target/output-dir/final_history"},
format = "parquet")
哪种方法是最好的方法以及如何将 jobBookmarkKeys 应用于方法 2?