0

当我在本地使用 Spark 流应用程序时,每条记录只需要一次,但是,当我将它部署在独立集群上时,它会从 Kafka 读取两次相同的消息。另外,我已经仔细检查过这不是与 kafka 生产者有关的问题。

这就是我创建流的方式:

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
                             Subscribe[String, String]("aTopic", kafkaParams))

这是 kafkaParams 配置:

"bootstrap.servers" -> KAFKA_SERVER,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)

集群有 2 个工作人员,每个工作人员有一个执行程序,看起来每个工作人员都接收相同的消息。任何人都可以帮助我,好吗?

编辑

例如,当我从 kafka 发送一个点时。从这段代码:

    stream.foreachRDD((rdd, time) => {
          if (!rdd.isEmpty) {
            log.info("Taken "+rdd.count()+ " points")
        }
    }

我得到"Taken 2 points". 如果我打印它们,它们是平等的。难道我做错了什么?

我在用着

  • “org.apache.spark”%%“spark-streaming-kafka-0-10”%“2.2.0”
  • 火花2.2.0
  • kafka_2.11-0.11.0.1
4

0 回答 0