1

我是火花流的新手,并试图实现 kafka、MongoDB 集成,其中我的代码从 Kafka 主题中提取 JSON 数据并插入到 MongoDB 中。下面是我的代码

package com.streams.sparkmongo

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import com.mongodb.spark._

object MongoStream {

  def main(args: Array[String]) {

    //Initializing Spark
    val conf = SparkSession.builder()
      .master("spark://localhost:7077")
      .appName("MongoStream")
      .config("spark.mongodb.output.uri", "mongodb://localhost/test.myCollection")
      .getOrCreate()

    //Defining Kafka configuration parameters
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "mongostream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean),
      "zookeeper.connect" -> "localhost:2181",
      "zookeeper.connection.timeout.ms" -> "10000",
      "zookeeper.session.timeout.ms" -> "10000")

    // Create a StreamingContext with a 10 second batch size from a SparkConf
    val ssc = new StreamingContext(conf.sparkContext, Seconds(10))

    //Defining Kafka topic
    val topics = Array("mongostreamtest")

    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val elementDstream = stream.map(v => v.value).foreachRDD { rdd =>
      import conf.implicits._
      //val payload = conf.read.json(rdd)
      rdd.saveToMongoDB()
    }
    ssc.start()
    ssc.awaitTermination
  }

}

我正在循环到每个 RDD 并使用辅助方法 saveToMongoDB() 将其保存到 MongoDB。我收到以下错误。任何输入将不胜感激。

WARN TaskSetManager: Lost task 0.2 in stage 9.0 (TID 11, <<ip>>, executor 0): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=REPLICA_SET, servers=[{address=localhost:27017, type=REPLICA_SET_SECONDARY, roundTripTime=0.6 ms, state=CONNECTED}]
        at com.mongodb.connection.BaseCluster.createTimeoutException(BaseCluster.java:377)
        at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:104)
        at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75)
        at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71)
        at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68)
        at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:411)
        at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
        at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
        at com.mongodb.Mongo.execute(Mongo.java:845)
        at com.mongodb.Mongo$2.execute(Mongo.java:828)
        at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:338)
        at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:322)
        at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119)
        at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:119)
        at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:118)
        at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
        at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
        at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
        at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
        at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
        at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
        at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
        at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:118)
        at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
4

0 回答 0