0

我有一个表ENTITLE_USER,如果它们不为空,我想从中选择用户 ID,否则为 -1。为此,我正在使用COALESCEDB2 的功能。我正在像这样在 Spark 中阅读我的查询:

val df1 = spark.read
        .format("jdbc")
        .options(Configuration.getDbConfMap)     //Specified all necessary DB properties here
        .option("dbtable", "(SELECT COALESCE(USER_ID,0) FROM DBNFMSXR.ENTITLE_USER) AS X")
        .load()
        .withColumnRenamed("1", "USER_ID")

当我这样做时df1.printSchema,我得到以下输出,这是预期的:

root
 |-- USER_ID: integer (nullable = true)

现在,当我尝试这样做时,df1.select("USER_ID").show()它会抛出一个很好的堆栈跟踪Exception

com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/06/01 13:03:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

20/06/01 13:03:33 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
20/06/01 13:03:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/06/01 13:03:33 INFO TaskSchedulerImpl: Cancelling stage 0
20/06/01 13:03:33 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
20/06/01 13:03:33 INFO DAGScheduler: ResultStage 0 (show at Test.scala:13) failed in 2.754 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
20/06/01 13:03:33 INFO DAGScheduler: Job 0 failed: show at Test.scala:13, took 2.834929 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
    at me.sparker0i.before.Test$.delayedEndpoint$me$sparker0i$before$Test$1(Test.scala:13)
    at me.sparker0i.before.Test$delayedInit$body.apply(Test.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:388)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at me.sparker0i.before.Test$.main(Test.scala:3)
    at me.sparker0i.before.Test.main(Test.scala)
Caused by: com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-206, SQLSTATE=42703, SQLERRMC=1, DRIVER=4.25.13
    at com.ibm.db2.jcc.am.b6.a(b6.java:810)
    at com.ibm.db2.jcc.am.b6.a(b6.java:66)
    at com.ibm.db2.jcc.am.b6.a(b6.java:140)
    at com.ibm.db2.jcc.am.k3.c(k3.java:2824)
    at com.ibm.db2.jcc.am.k3.d(k3.java:2808)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2234)
    at com.ibm.db2.jcc.am.k4.a(k4.java:8242)
    at com.ibm.db2.jcc.am.k3.a(k3.java:2210)
    at com.ibm.db2.jcc.t4.ab.i(ab.java:201)
    at com.ibm.db2.jcc.t4.ab.b(ab.java:96)
    at com.ibm.db2.jcc.t4.p.a(p.java:32)
    at com.ibm.db2.jcc.t4.av.i(av.java:150)
    at com.ibm.db2.jcc.am.k3.al(k3.java:2203)
    at com.ibm.db2.jcc.am.k4.bq(k4.java:3730)
    at com.ibm.db2.jcc.am.k4.a(k4.java:4609)
    at com.ibm.db2.jcc.am.k4.b(k4.java:4182)
    at com.ibm.db2.jcc.am.k4.bd(k4.java:780)
    at com.ibm.db2.jcc.am.k4.executeQuery(k4.java:745)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

COALESCE对于使用并尝试在 Spark JDBC 中运行的任何 DB2 查询,这通常是正确的。

有什么方法可以让我们使用COALESCESpark JDBC 运行查询。

4

1 回答 1

1

您正在将列1(不存在)重命名为USER_ID.

我不熟悉 DB2,但需要在 SQL 语句中设置列名:

.option("dbtable", "(SELECT COALESCE(USER_ID,0) AS USER_ID FROM DBNFMSXR.ENTITLE_USER) AS X")

省略 the .withColumnRenamed("1", "USER_ID"),这将起作用。

于 2020-06-01T08:48:49.033 回答