0

我正在使用 Flink 1.11 处理 Kinesis 应用程序,但在启动我的应用程序时出现以下错误:

java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 421563c271e57acb4592f9d447d45b42  timed out.
    at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202)
    at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我在 Kinesis 和 4 KPU 中使用默认配置。

4

2 回答 2

0

从报错来看,TM和JM的心跳已经超时,或许可以给出Job的完整配置信息。

JM和TM之间超时的原因有很多,比如并发任务大、JM负载高等。

也许你也可以调整 heartbeat.timeout 参数,参考https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/

于 2021-12-29T16:03:37.280 回答
0

最后我解决了这个问题。就我而言,问题是我正在读取一个对于 Flink 工作人员来说太大的 CSVSource。我使用 dynamodb 作为数据源解决了这个问题。

于 2022-01-12T20:08:16.900 回答