1

I created a HA Flink v1.2 cluster made up of 1 JobManager and 2 TaskManagers each in its own VM (not using YARN or hdfs). After I start a job on the JobManager node I kill one TaskManager instance. Immediately in the Web Dashboard I can see the job being cancelled and then failing. If I check the logs:

03/06/2017 16:23:50 Flat Map(1/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(2/2) switched to SCHEDULED 
03/06/2017 16:23:50 Flat Map(2/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Flat Map(2/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(2/2) switched to RUNNING 
03/06/2017 16:25:38 Flat Map(1/2) switched to FAILED 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)

03/06/2017 16:25:38 Job execution switched to status FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELED 
03/06/2017 16:26:18 Source: Custom Source -> Flat Map(2/2) switched to CANCELED 
03/06/2017 16:26:18 Flat Map(2/2) switched to CANCELED 

In the job implementation I have

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // number
                                                                // of
                                                                // restart
                                                                // attempts
        Time.of(10, TimeUnit.SECONDS) // delay
));

My question is shouldn't the JobManager automatically redirect all requests to the remaining / running TaskManager? Similarly if I start the JobManager and 1 TaskManager instance, and run a job, when I start the 2nd TaskManager instance should it also contribute to solve the running job?

Thanks!

4

1 回答 1

1

首先RestartStrategy与HA模式无关。高可用性涉及JobManager. 无论如何,HA 要工作至少需要两个 JobManagers 实例(你说你只开始一个)。

至于RestartStrategy当您在失败后指定fixedDelayRestart策略时(例如在您的情况下杀死 TaskManager),该作业将再次尝试运行(在您的情况下在 10 秒后)。如果在您的安装中不是这种情况,您可能缺少运行作业的可用资源(我想您每个任务槽有 1 个,TaskManager所以当只剩下一个时,您无法运行并行度为 2 或更多的作业)。

对于最后一个问题,添加 aTaskManager不会有助于运行作业。以某种方式连接的行为称为动态缩放。您可以通过获取一个保存点然后使用更多资源重新运行它来做到这一点。看看这里。自动重新缩放正在进行中。

于 2017-03-08T09:27:41.343 回答