5

基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。

如何为火花结构化流指定kafka消费者的组ID?

如何通过 Spark Structured Streaming 确保 kafka 数据摄取不丢失数据?

但是,我尝试了 spark 3.0 中的设置,如下所示。

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement


//import org.apache.spark.sql.hive.HiveContext

import scala.io.Source

import java.nio.charset.StandardCharsets

import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding


object App {
    
    def main(args: Array[String]): Unit = {
      
      val spark: SparkSession = SparkSession.builder()
        .appName("MY-APP")
        .getOrCreate()

      import spark.sqlContext.implicits._

      spark.catalog.clearCache()
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
      spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

      spark.sparkContext.setLogLevel("ERROR")
      spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")
      
      System.gc()
      
      val df = spark.readStream
        .format("kafka")
          .option("kafka.bootstrap.servers", "mybroker.io:6667")
          .option("subscribe", "mytopic")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
          .option("kafka.group.id","MYID")
          .load()

      df.printSchema()

      
      val schema = new StructType()
        .add("id", StringType)
        .add("x", StringType)
        .add("eventtime", StringType)

      val idservice = df.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), schema).as("data"))
        .select("data.*")

       
      val monitoring_df = idservice
                .selectExpr("cast(id as string) id", 
                            "cast(x as string) x",
                            "cast(eventtime as string) eventtime")              

      val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                    batchDF.persist()
                                    printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)                                    
                                    batchDF.show()

                                    batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
                                    spark.catalog.refreshTable("mytable")
                                    
                                    batchDF.unpersist()
                                    spark.catalog.clearCache()
                                }
                            }
                            .start()
                            .awaitTermination()
    }
   
}

使用以下 spark-submit 命令在独立模式下测试 spark 作业,但是当我在 AWS EMR 中以集群模式部署时存在同样的问题。

spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar

然后,我开始流式作业以从 Kafka 主题中读取流式数据。一段时间后,我杀死了火花工作。然后,我等待 1 小时重新开始工作。如果我理解正确,新的流数据应该从我终止 spark 作业时的偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致数据丢失。

我是否需要配置更多选项以避免数据丢失?还是我对 Spark 3.0 有什么误解?谢谢!

问题解决了

这里的关键问题是检查点必须专门添加到查询中。仅仅为 SparkContext 添加检查点是不够的。添加检查点后,它正在工作。在checkpoint文件夹中,会创建一个offset子文件夹,里面包含offset文件,0,1,2,3....对于每个文件,都会显示不同分区的offset信息。

{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}

一种建议是将检查点放在一些外部存储上,例如 s3。即使您需要重建 EMR 集群本身以防万一,它也可以帮助恢复偏移量。

4

1 回答 1

5

根据Spark Structured Integration Guide,Spark 本身会跟踪偏移量,并且没有将偏移量提交回 Kafka。这意味着如果您的 Spark Streaming 作业失败并且您重新启动它,所有关于偏移量的必要信息都存储在 Spark 的检查点文件中。

即使您将 ConsumerGroup 名称设置为kafka.group.id,您的应用程序仍然不会将消息提交回 Kafka。有关要读取的下一个偏移量的信息仅在 Spark 应用程序的检查点文件中可用。

如果您在没有重新部署的情况下停止并重新启动您的应用程序,并确保您没有删除旧的检查点文件,您的应用程序将从停止的地方继续读取。

在使用检查点从故障中恢复的 Spark Structured Streaming 文档中写道:

“如果发生故障或故意关闭,您可以恢复先前查询的先前进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,并且查询将保存所有进度信息(即每个触发器中处理的偏移范围)[...]”

这可以通过在writeStream查询中设置以下选项来实现(在 SparkContext 配置中设置检查点目录是不够的)

.option("checkpointLocation", "path/to/HDFS/dir")

在文档中还指出“此检查点位置必须是HDFS 兼容文件系统中的路径,并且可以在启动查询时在 DataStreamWriter 中设置为选项。”

此外,Spark Structured Streaming 的容错能力还取决于您的输出接收器,如输出接收器部分所述。

由于您当前正在使用ForeachBatchSink,因此您的应用程序中可能没有重新启动功能。

在此处输入图像描述

于 2020-09-22T05:29:09.097 回答