0

我正在使用带有 kafka 的 Spark 3.0.1 来聚合 12,5000,000 个用户的数据。记录约为每秒 200000 条。输出方式为“追加”,数据库为Mongo。Spark 从 kafka 读取数据并将结果写入 Mongo。然而,应用程序花费大量时间进行 GC。为什么应用程序使用了这么多内存?我想知道有没有办法及时释放内存或者我的配置错误?

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaConn)
  .option("subscribe", "user_topic")
  .load()
val query = df
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
  .as[(String, String)]
  .select(from_json($"value", jsonSchema).as("data"))
  .selectExpr(
    "data.AgentID", "data.subscriber", "data.app", "data.octCount","data.Time"
  ).selectExpr("cast(Time/1000 as TIMESTAMP) as Time", "subscriber", "app", "FD", "octCount")
  .withWatermark("Time", "1 minutes")
  .groupBy(window($"Time", "15 minutes", "15 minutes"), $"subscriber", $"app")
  .agg(sum(col("octCount")) as "oct_count")
  .selectExpr("cast(window.start as LONG)*1000 as time", "subscriber", "app", "oct_count")
  .writeStream
  .outputMode("append")

运行参数为:

--num-executors 2 --executor-cores 8 --executor-memory 26G --conf spark.default.parallelism=16 --conf spark.sql.shuffle.partitions=16 --conf spark.memory.fraction=0.1 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=6

在此处输入图像描述

4

0 回答 0