1

我正在尝试设置 Spark Streaming 以读取 MQTT 源,但是当我收到第二条消息时它会启动异常。

我有以下代码:

import java.sql.Timestamp

import org.apache.bahir.sql.streaming.mqtt._
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.types.StructType

object App {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Read text from socket
    val lines = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider").option("topic", "measurement").option("username", "spark").option("password", "******").load("tcp://10.0.0.129:1883").as[(String, Timestamp)]

    val query = lines.writeStream.format("console").start

    query.awaitTermination()

  }
}

当我收到第二条消息时,我观察到以下异常:

17/01/16 16:18:33 ERROR StreamExecution: Query query-1 terminated with error
java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142)
Exception in thread "stream execution thread for query-1" java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142)
org.apache.spark.sql.streaming.StreamingQueryException: Query query-1 terminated with exception: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:248)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142)
Caused by: java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)
  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212)
  at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208)
  ... 1 more

有人遇到过这个问题吗?

4

1 回答 1

-1

您的 spark-MQTT jar 是否与您的 spark 版本相同?不同的版本可能会导致这些问题。我在使用 Cloudera Express 的 spark 1.6 时遇到了类似的问题。升级到相同版本后问题解决了

于 2017-01-17T14:15:26.237 回答