2

我正在 EMR 上使用 SQS 执行 Streaming 作业,但是在运行 10 分钟后,它开始在后台抛出错误(尽管应用程序仍在运行),导致日志中出现大量噪音。

2019-12-09 04:00:00,391 ERROR [JobGenerator] org.apache.spark.streaming.CheckpointWriter:Could not submit checkpoint task to the thread pool executor
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@7d4ff43b rejected from java.util.concurrent.ThreadPoolExecutor@7e3a352[Running, pool size = 1, active threads = 1, queued tasks = 1000, completed tasks = 75046]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:291)
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
2019-12-09 04:00:00,389 INFO [JobScheduler] org.apache.spark.streaming.scheduler.JobScheduler:Starting job streaming job 1575864000000 ms.0 from job set of time 1575864000000 ms
2019-12-09 04:00:00,422 INFO [JobScheduler] org.apache.spark.streaming.scheduler.JobScheduler:Finished job streaming job 1575864000000 ms.0 from job set of time 1575864000000 ms
2019-12-09 04:00:00,422 INFO [JobScheduler] org.apache.spark.streaming.scheduler.JobScheduler:Total delay: 0.422 s for time 1575864000000 ms (execution: 0.033 s)
2019-12-09 04:00:00,423 INFO [JobGenerator] org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD:Removing RDD 106143 from persistence list
2019-12-09 04:00:00,423 INFO [JobGenerator] org.apache.spark.streaming.dstream.PluggableInputDStream:Removing blocks of RDD WriteAheadLogBackedBlockRDD[106143] at receiverStream at StreamingSQSApp.scala:139 of time 1575864000000 ms
2019-12-09 04:00:00,423 INFO [JobGenerator] org.apache.spark.streaming.scheduler.JobGenerator:Checkpointing graph for time 1575864000000 ms
2019-12-09 04:00:00,423 INFO [JobGenerator] org.apache.spark.streaming.DStreamGraph:Updating checkpoint data for time 1575864000000 ms
2019-12-09 04:00:00,423 INFO [JobGenerator] org.apache.spark.streaming.DStreamGraph:Updated checkpoint data for time 1575864000000 ms
2019-12-09 04:00:00,424 ERROR [JobGenerator] org.apache.spark.streaming.CheckpointWriter:Could not submit checkpoint task to the thread pool executor
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@7a73bfc9 rejected from java.util.concurrent.ThreadPoolExecutor@7e3a352[Running, pool size = 1, active threads = 1, queued tasks = 1000, completed tasks = 75046]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:291)
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:297)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
2019-12-09 04:00:00,578 INFO [pool-26-thread-1] org.apache.spark.streaming.CheckpointWriter:Deleting s3://bucket/streaming/checkpoint/checkpoint-1575859114000
2019-12-09 04:00:00,729 INFO [pool-26-thread-1] org.apache.spark.streaming.CheckpointWriter:Checkpoint for time 1575859160000 ms saved to file 's3://bucket/streaming/checkpoint/checkpoint-1575859160000', took 10896 bytes and 4907 ms

以下是为应用程序提供的 spakr conf

val sparkConf = new SparkConf()
      .setAppName("Spark Streaming using SQS")
      .set("spark.scheduler.mode", "FAIR")
      .set("spark.sql.parquet.filterpushdown", "true")
      .set("spark.executor.hearbeatInterval", "20")
      .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
      .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
      .set("spark.streaming.backpressure.enabled","true")
4

0 回答 0