1

我想通过 Kudu API 编写和更新。这是maven依赖:

<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-client</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-spark2_2.11</artifactId>
  <version>1.1.0</version>
</dependency>

在下面的代码中,我不知道KuduContext参数。

我在 spark2-shell 中的代码:

val kuduContext = new KuduContext("master:7051") 

Spark 2.1 流式传输中的相同错误:

import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
val sparkConf = new SparkConf().setAppName("DirectKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {
   val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
   import spark.implicits._
   val bb = spark.read.options(Map("kudu.master" -> "master:7051","kudu.table" -> "table")).kudu //good 
   val kuduContext = new KuduContext("master:7051") //error
})

然后是错误:

org.apache.spark.SparkException:此 JVM 中只能运行一个 SparkContext(请参阅 SPARK-2243)。要忽略此错误,请设置 spark.driver.allowMultipleContexts = true。当前运行的 SparkContext 创建于:org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)

4

1 回答 1

1

将您的 Kudu 版本更新到最新版本(当前为 1.5.0)。在更高版本KuduContext中将SparkContext用作输入参数,这应该可以防止此问题。

此外,在foreachRDD. 在您提供的代码中,将 foreach 移出sparkkuduContext移出。此外,您不需要创建单独的sparkConf,您可以只使用较新的SparkSession

val spark = SparkSession.builder.appName("DirectKafka").master("local[*]").getOrCreate()
import spark.implicits._

val kuduContext = new KuduContext("master:7051", spark.sparkContext)
val bb = spark.read.options(Map("kudu.master" -> "master:7051", "kudu.table" -> "table")).kudu

val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {   
  // do something with the bb table and messages       
})
于 2018-01-10T02:45:22.187 回答