1
Python 2.7
Pyspark 2.2.1
JDBC format for MySQL->Spark DF
For writing Spark DF-> AWS Redshift i am using the `Spark-Redshift` driver from Databricks.

我正在为我的应用程序从 MySQL 表中将数据读取到 Spark 中,由于上下文并取决于输入参数,我需要获取今天之前更新的所有记录,或者仅获取在请求日期之前更新的记录(包括)。

spark.read.format("jdbc")
            .option("url", "url")
            .option("driver", driver)
            .option("dbtable", query)
            .load()

查询是

if days > 0:
    get_date = date.today() - timedelta(days)
    query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) >= DATE('{}') " \
            "AND CAST({}.updatedAt AS date) < CURDATE()) AS t".format(table, table, get_date, table)
elif days == 0:
    query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) < CURDATE() " \
            "OR updatedAt IS NULL) AS t".format(table, table)

一旦数据被读入 Spark 数据帧并且 ETL 不包含其他与时间戳相关的操作,该时间戳列就会被丢弃。最后一步是将操纵的记录写入 AWS Redshift 表。

我的问题是,有时应用程序 Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp在写入 Redshift 时会崩溃,但我猜这个问题来自读取时的转换,只是 Spark 延迟执行导致写入 Redshift 时出现异常(目标中没有时间戳或日期列任何红移表)

在上个月和每天运行的 4 个不同的作业中,我大约 15% 的时间在日志中遇到了这个异常,然后作业失败了,但它大部分时间运行良好,这使得无法重现问题或调试更远。

我怀疑 SQL 查询中的 String-> Timestamp 转换会造成问题,但我不确定如何以不会引发此异常的另一种方式实现相同的目标。非常感谢任何帮助!

更多堆栈跟踪信息:

py4j.protocol.Py4JJavaError: An error occurred while calling o827.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 601 in stage 93.0 failed 4 times, most recent failure: Lost task 601.3 in stage 93.0 (TID 5282, url, executor 5): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:234)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:233)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:252)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:248)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
4

0 回答 0