我正在使用带有 kafka 的 Spark 3.0.1 来聚合 12,5000,000 个用户的数据。记录约为每秒 200000 条。输出方式为“追加”,数据库为Mongo。Spark 从 kafka 读取数据并将结果写入 Mongo。然而,应用程序花费大量时间进行 GC。为什么应用程序使用了这么多内存?我想知道有没有办法及时释放内存或者我的配置错误?
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConn)
.option("subscribe", "user_topic")
.load()
val query = df
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.as[(String, String)]
.select(from_json($"value", jsonSchema).as("data"))
.selectExpr(
"data.AgentID", "data.subscriber", "data.app", "data.octCount","data.Time"
).selectExpr("cast(Time/1000 as TIMESTAMP) as Time", "subscriber", "app", "FD", "octCount")
.withWatermark("Time", "1 minutes")
.groupBy(window($"Time", "15 minutes", "15 minutes"), $"subscriber", $"app")
.agg(sum(col("octCount")) as "oct_count")
.selectExpr("cast(window.start as LONG)*1000 as time", "subscriber", "app", "oct_count")
.writeStream
.outputMode("append")
运行参数为:
--num-executors 2 --executor-cores 8 --executor-memory 26G --conf spark.default.parallelism=16 --conf spark.sql.shuffle.partitions=16 --conf spark.memory.fraction=0.1 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=6