0

我有一个要求,我使用 DStream 从 Kafka 检索消息。现在在获取消息或 RDD 之后,我现在使用映射操作在执行程序上独立处理消息。我面临的一个挑战是我需要从执行程序中读取/写入配置单元表,为此我需要访问 SQLContext。但据我所知,SparkSession 仅在驱动程序端可用,不应在执行程序中使用。现在没有 spark 会话(在 spark 2.1.1 中),我无法掌握 SQLContext。总结一下我的驱动程序代码如下所示:

if (inputDStream_obj.isSuccess) {
      val inputDStream = inputDStream_obj.get
      inputDStream.foreachRDD(rdd => {

        if (!rdd.isEmpty) {
           val rdd1 = rdd.map(idocMessage => SegmentLoader.processMessage(props, idocMessage.value(), true))
       }
}

所以在这个 rdd.map 之后,下一个代码在 executors 上执行,我有类似的东西:

val sqlContext = spark.sqlContext
 import sqlContext.implicits._
 spark.sql("USE " + databaseName)
 val result = Try(df.write.insertInto(tableName))

传递 sparksession 或 sqlcontext 在执行器上使用时会出错:

  • 当我尝试获取现有的 sparksession 时:org.apache.spark.SparkException: A master URL must be set in your configuration

  • 当我广播会话变量时:User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 9, <server>, executor 2): java.lang.NullPointerException

  • 当我通过 sparksession 对象时:User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 9, <server>, executor 2): java.lang.NullPointerException

如果您可以建议如何从执行程序中查询/更新配置单元表,请告诉我。

谢谢,里特威克

4

0 回答 0