我有一个要求,我使用 DStream 从 Kafka 检索消息。现在在获取消息或 RDD 之后,我现在使用映射操作在执行程序上独立处理消息。我面临的一个挑战是我需要从执行程序中读取/写入配置单元表,为此我需要访问 SQLContext。但据我所知,SparkSession 仅在驱动程序端可用,不应在执行程序中使用。现在没有 spark 会话(在 spark 2.1.1 中),我无法掌握 SQLContext。总结一下我的驱动程序代码如下所示:
if (inputDStream_obj.isSuccess) {
val inputDStream = inputDStream_obj.get
inputDStream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val rdd1 = rdd.map(idocMessage => SegmentLoader.processMessage(props, idocMessage.value(), true))
}
}
所以在这个 rdd.map 之后,下一个代码在 executors 上执行,我有类似的东西:
val sqlContext = spark.sqlContext
import sqlContext.implicits._
spark.sql("USE " + databaseName)
val result = Try(df.write.insertInto(tableName))
传递 sparksession 或 sqlcontext 在执行器上使用时会出错:
当我尝试获取现有的 sparksession 时:
org.apache.spark.SparkException: A master URL must be set in your configuration
当我广播会话变量时:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 9, <server>, executor 2): java.lang.NullPointerException
当我通过 sparksession 对象时:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 9, <server>, executor 2): java.lang.NullPointerException
如果您可以建议如何从执行程序中查询/更新配置单元表,请告诉我。
谢谢,里特威克