0

我有一个 AWS Glue 作业设置,它将使用 JDBC 连接从 AWS Redshift 读取数据。

来自 DBeaver 的列值:2020-05-08 12:36:53.000 +0530 来自 RedShift 查询编辑器的列值:2020-05-08 07:06:53+00

Redshift 中的数据类型:带有时区 的时间戳 AWS Glue 目录表中的数据类型:时间戳

我编写了一个爬虫作业,将该值映射到时间戳,但是当我尝试处理 AWS Glue 作业时,我遇到了异常。我尝试了各种。代码工作正常,我从 CSV 而不是 Redshift 表中读取值。从 CSV 读取时,爬虫作业映射的数据类型是字符串。我尝试将胶水目录表更改为字符串,但这也不起作用。

这个家伙也面临着类似的问题,但这个帖子已经很老了,没有有效的解决方案https://github.com/databricks/spark-redshift/issues/391

Option 1 : Converting to string medicare_res_cast = medicare_dyf.resolveChoice(specs = ('updated_at','cast:string'),('created_at','cast:string'))

Option 2 : to_timestamp: df.withColumn("updated_at",to_timestamp("updated_at")).show(truncate=False)

Option 3 : split : df.withColumn("updated_at", split(col("updated_at"), "+").getItem(0)).show()

ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
  File "/tmp/etl-custom-redshift", line 36, in <module>
    medicare_df.withColumn("updated_at", split(col("updated_at"), "+").getItem(0)).show()
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o92.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.27.144, executor 1): java.lang.NumberFormatException: For input string: "53+00"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at java.sql.Timestamp.valueOf(Timestamp.java:259)
    at com.databricks.spark.redshift.Conversions$$anonfun$1$$anonfun$apply$11.apply(Conversions.scala:108)
    at com.databricks.spark.redshift.Conversions$$anonfun$1$$anonfun$apply$11.apply(Conversions.scala:108)
    at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:120)
    at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:116)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:104)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    ... 1 more
4

0 回答 0