3

有没有人使用过 Amazon Quantum Ledger Database (QLDB) Amazon ion文件?如果是这样,您知道如何提取“数据”部分来制定表格吗?也许使用 python 来抓取数据?我正在尝试从存储在 s3 中的这些文件中获取“数据”信息(我无权访问 QLDB,因此无法直接查询),然后将结果上传到 Glue。

我正在尝试使用 GLue 执行 ETL 作业,但 Glue 不喜欢 Amazon Ion 文件,因此我需要从这些文件中查询数据或抓取文件以获取相关信息。

谢谢。   PS  :“数据”信息是指:

{
    PersonId:"4tPW8xtKSGF5b6JyTihI1U",
    LicenseNumber:"LEWISR261LL",
    LicenseType:"Learner",
    ValidFromDate:2016–12–20,
    ValidToDate:2020–11–15
}

参考 :https ://docs.aws.amazon.com/qldb/latest/developerguide/working.userdata.html

4

2 回答 2

3

您是否尝试过使用Amazon Ion库?

假设问题中提到的数据存在于名为“myIonFile.ion”的文件中,并且如果文件中只有离子对象,我们可以从文件中读取数据,如下所示:

from amazon.ion import simpleion

file = open("myIonFile.ion", "rb")                    # opening the file
data = file.read()                                    # getting the bytes for the file
iondata = simpleion.loads(data, single_value=False)   # Loading as ion data
print(iondata['PersonId'])                            # should print "4tPW8xtKSGF5b6JyTihI1U"

Ion Cookbook中提供了有关使用离子库的更多指导

此外,我不确定您的用例,但与 QLDB 的交互也可以通过直接依赖于 Ion 库的QLDB 驱动程序来完成。

于 2020-07-10T01:40:21.927 回答
2

诺西菲韦,

AWS Glue能够读取 Amazon Ion 输入。但是,许多其他服务和应用程序不能,因此最好使用 Glue 将 Ion 数据转换为 JSON。注意,Ion 是 JSON 的超集,给 JSON 添加了一些数据类型,所以将 Ion 转换为 JSON 可能会导致一些向下转换

从 QLDB S3 导出访问 QLDB 文档的一种好方法是使用 Glue 提取文档数据,将其作为 JSON 存储在 S3 中,然后使用 Amazon Athena 进行查询。该过程如下:

  1. 将您的分类帐数据导出到 S3
  2. 创建Glue 爬网程序以对导出的数据进行爬网和编目。
  3. 运行Glue ETL 作业以从导出文件中提取修订数据,将其转换为 JSON,然后将其写入 S3。
  4. 创建一个Glue 爬虫来对提取的数据进行爬取和编目。
  5. 使用 Amazon Athena 查询提取的文档修订数据。

看看下面的 PySpark 脚本。它仅从 QLDB 导出文件中提取修订元数据和数据负载。

QLDB 导出映射每个文档的表,但与修订数据分开。您必须进行一些额外的编码才能在输出的修订数据中包含表名。下面的代码没有这样做,因此您最终会在输出的一个表中得到所有修订。

另请注意,您将获得导出数据中发生的任何修订。也就是说,您可能会获得给定文档 ID 的多个文档修订版本。根据您对数据的预期用途,您可能需要弄清楚如何仅获取每个文档 ID 的最新版本。

from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
from awsglue.dynamicframe import DynamicFrame

# Initializations
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Load data.  'vehicle-registration-ion' is the name of your database in the Glue catalog for the export data.  '2020' is the name of your table in the Glue catalog.
dyn0 = glueContext.create_dynamic_frame.from_catalog(database = "vehicle-registration-ion", table_name = "2020", transformation_ctx = "datasource0")

# Only give me exported records with revisions
dyn1 = dyn0.filter(lambda line: "revisions" in line)

# Now give me just the revisions element and convert to a Spark DataFrame.
df0 = dyn1.select_fields("revisions").toDF()

# Revisions is an array, so give me all of the array items as top-level "rows" instead of being a nested array field.
df1 = df0.select(explode(df0.revisions))

# Now I have a list of elements with "col" as their root node and the revision 
# fields ("data", "metadata", etc.) as sub-elements.  Explode() gave me the "col"
# root node and some rows with null "data" fields, so filter out the nulls.
df2 = df1.where(col("col.data").isNotNull())

# Now convert back to a DynamicFrame
dyn2 = DynamicFrame.fromDF(df2, glueContext, "dyn2")

# Prep and send the output to S3
applymapping1 = ApplyMapping.apply(frame = dyn2, mappings = [("col.data", "struct", "data", "struct"), ("col.metadata", "struct", "metadata", "struct")], transformation_ctx = "applymapping1")
datasink0 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://YOUR_BUCKET_NAME_HERE/YOUR_DESIRED_OUTPUT_PATH_HERE/"}, format = "json", transformation_ctx = "datasink0")

我希望这有帮助!

于 2020-07-22T23:44:46.710 回答