0

正如标题所述,我无法将动态框架上的列从 Epoch 转换为时间戳。

我已尝试转换为数据框并返回动态框,但它不起作用。

这就是我正在使用的:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.functions import udf

from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "s3-sat-dth-prd", table_name = "s3_sat_dth_prd_vehicle", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("in", "int", "in", "int"), ("out", "int", "out", "int"), ("ts", "long", "ts", "long"), ("cam", "string", "cam", "string"), ("subclass", "string", "subclass", "string")], transformation_ctx = "applymapping1")   

selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["in", "out", "ts", "cam", "subclass"], transformation_ctx = "selectfields2")

resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "s3-sat-dth-prd", table_name = "test_split_array_into_records_json", transformation_ctx = "resolvechoice3")

datasink4 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice3, database = "s3-sat-dth-prd", table_name = "test_split_array_into_records_json", transformation_ctx = "datasink4")
job.commit()

我尝试的是创建一个 Data Frametsconvert = resolvechoice3.toDF()并将其转回 Dynamic Frame resolvechoice4 = DynamicFrame.fromDF(tsconvert, GlueContext, resolvechoice4);我在最后粘贴到resolvechoice4.

找不到 Glue 中是否有任何东西可以转换为时间戳。当我确保数据正确写入 S3 时,Redshift 将成为我的目标。

有没有人做过这样的事情并且可以引导我?

提前致谢。

4

1 回答 1

1

AWS Glue 具有 SQL 函数(通过 pyspark 包导入),允许将纪元时间戳转换为人类可读或所需的日期格式。

例子:

from pyspark.sql.functions import from_unixtime, unix_timestamp, col

resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "s3-sat-dth-prd", table_name = "test_split_array_into_records_json", transformation_ctx = "resolvechoice3")

tsconvert = resolvechoice3.toDF()
tsconverted= tsconvert.withColumn(col(tsColumnName),from_unixtime(col(tsColumnName)))
resolvechoice4 = DynamicFrame.fromDF(tsconverted, glue_context,"transformedDF")

根据您的需要,您可以使用pyspark.sql.functions类中的日期函数以类似的方式定义日期格式。

于 2021-02-25T17:39:34.737 回答