3

考虑以下 aws 粘合作业代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
    database = "my_database",
    table_name = "my_table")
medicare_dynamicframe.printSchema()

job.commit()

它打印出类似的东西(注意它price_key不在第二个位置):

root
|-- day_key: string
...
|-- price_key: string

my_tabledatalake 中,定义为day_keyas int(第一列)和price_keyas decimal(25,0)(第二列)。

可能是我错了,但我从源代码中发现 aws glue 使用表和数据库来获取数据的 s3 路径,但完全忽略任何类型定义。可能适用于某些数据格式,例如parquet它是正常的,但不适用于csv.

如何配置 aws 胶水以从数据湖表定义中为带有 csv 的动态框架设置模式?

4

0 回答 0