1

Spark Structured Streaming 代码时出现以下异常

18/12/05 15:00:38 错误 StreamExecution:查询 [id = 48ec92a0-811a-4d57-a65d-c0b9c754e093,runId = 5e2adff4-855e-46c6-8592-05e3557544c6] 以错误 java.lang.ClassCastException: org. apache.spark.sql.execution.streaming.SerializedOffset 无法在 org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:152 ) 在 org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:614 )

每次我启动查询时都会发生此异常。当我在删除检查点后启动它时它确实有效。

Spark 结构化流代码如下,基本上我只是从 MQTT 队列中读取数据并写入 ElasticSearch 索引。

spark
  .readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "Employee")
  .option("username", "username")
  .option("password", "password")
  .option("clientId", "employee11")
  .load("tcp://localhost:8000")
  .as[(String, Timestamp)]
  .writeStream
  .outputMode("append")
  .format("es")
  .option("es.resource", "spark/employee")
  .option("es.nodes", "localhost")
  .option("es.port", 9200)
  .start()
  .awaitTermination()

以下是使用的依赖项。我使用 MapR 分发。

  "org.apache.spark" %% "spark-core" % "2.2.1-mapr-1803",
  "org.apache.spark" %% "spark-sql" % "2.2.1-mapr-1803",
  "org.apache.spark" %% "spark-streaming" % "2.2.1-mapr-1803",
  "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.2.1",
  "org.apache.bahir" %% "spark-streaming-mqtt" % "2.2.1",
  "org.elasticsearch" %% "elasticsearch-spark-20" % "6.3.2"

火花提交命令

/opt/mapr/spark/spark-2.2.1/bin/spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --jars spark-sql-streaming-mqtt_2.11-2.2.1.jar,org.eclipse.paho.client.mqttv3-1.1.0.jar,elasticsearch-spark-20_2.11-6.3.2.jar,mail-1.4.7.jar myjar_2.11-0.1.jar \
  --class <MAIN_CLASS>

对此的任何帮助将不胜感激。

4

1 回答 1

1

这似乎是 Apache Bahir 中的一个错误

于 2018-12-11T15:25:28.817 回答