我正在 AWS 中构建一个数据湖。源数据作为 CDC 导入 S3。我需要找到一种方法来合并它们,以便拥有一个包含最新版本信息的表格。
最初我想使用 Glue 进行 ETL 开发,但编辑器似乎相当笨重。此外,数据量并没有太大以至于需要火花。Pandas 也将发挥作用,并且在组织中拥有更广泛的知识库。
因此,我使用 Glue 来抓取导入,现在有了 Athena 表,我想在 Cloud9 中开发我的聚合,以便稍后迁移到 Lambda 函数。
问题是我无法将 Athena 数据放入数据框中。
我已经尝试了 boto3 中的 start_query_execution 函数,但它不返回数据,而只是将其写入我不想要的 S3 中。它还作为 QueryExecutionId 返回,我已将其传递给另一个名为 get_query_results 的 boto 函数。似乎有响应,但我在如何将数据传递到数据框(是 JSON 还是 dict?)上苦苦挣扎。
#python 3.6
import pandas as pd
import numpy as np
import boto3
import time
#https://dev.classmethod.jp/cloud/run-amazon-athenas-query-with-aws-lambda/
#athena constant
DATABASE = 'myDatabase'
TABLE = 'myTable'
#output
S3_OUTPUT = 's3://myBucket/myPath/'
client = boto3.client('athena')
response = client.start_query_execution(
QueryString='select * from myTable limit 100',
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
print(response["QueryExecutionId"])
time.sleep(50)
data = client.get_query_results(
QueryExecutionId=response["QueryExecutionId"]
)
dataDf = pd.read_json(data["ResultSet"])
print(dataDf.head())