0

我们正在尝试编写将写入 hdfs 的 spark 流应用程序。但是,每当我们编写文件时,都会出现大量重复文件。无论我们是否使用 kill 使应用程序崩溃,这种行为都会发生。也适用于 Dstream 和结构化 api。来源是kafka主题。检查点目录的行为听起来很随机。我没有遇到关于这个问题的非常相关的信息。

问题是:检查点目录可以提供仅一次的行为吗?

scala version: 2.11.8
spark version:  2.3.1.3.0.1.0-187
kafka version :  2.11-1.1.0
zookeeper version :  3.4.8-1 
HDP : 3.1

任何帮助表示赞赏。谢谢,高塔姆

object sparkStructuredDownloading {
    val kafka_brokers="kfk01.*.com:9092,kfk02.*.com:9092,kfk03.*.com:9092"
    def main(args: Array[String]): Unit = {
        var topic = args(0).trim().toString()
        new downloadingAnalysis(kafka_brokers ,topic).process()
        }

}

class downloadingAnalysis(brokers: String,topic: String) {

    def process(): Unit = {
        //  try{
        val spark = SparkSession.builder()
        .appName("sparkStructuredDownloading")
        // .appName("kafka_duplicate")
        .getOrCreate()
        spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

        println("Application Started")
        import spark.implicits._
        import scala.concurrent.duration._
        import org.apache.spark.sql.streaming.{OutputMode, Trigger}
        import org.apache.spark.sql.streaming.Trigger

        val inputDf = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topic)
        .option("startingOffsets", "latest")
        //.option("kafka.group.id", "testduplicate")
        .load()
        val personJsonDf = inputDf.selectExpr("CAST(value AS STRING)") //Converting binary to text
        println("READ STREAM INITIATED")

        import scala.concurrent.duration._
        import org.apache.spark.sql.streaming.{OutputMode, Trigger}
        import org.apache.spark.sql.streaming.Trigger

        import spark.implicits._
        val filteredDF= personJsonDf.filter(line=> new ParseLogs().validateLogLine(line.get(0).toString()))

        spark.sqlContext.udf.register("parseLogLine", (logLine: String) => {
        val df1 = filteredDF.selectExpr("parseLogLine(value) as result")
        println(df1.schema)
        println("WRITE STREAM INITIATED")
        val checkpoint_loc="/warehouse/test_duplicate/download/checkpoint1"
        val kafkaOutput = result.writeStream
        .outputMode("append")
        .format("orc")
        .option("path", "/warehouse/test_duplicate/download/data1")
        .option("maxRecordsPerFile", 10)
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .start()
        .awaitTermination() 

}

4

0 回答 0