0

我正在尝试重新部署应用程序,但在 Flink 版本:1.10.1 上出现以下问题而失败。

以下是我在部署应用程序时获得的应用程序配置和错误日志。直到有时,此配置都可以正常工作。直到最近它才开始失败。

我已经检查了 S3 目录中的检查点,它包含最新日期为 1 月 6 日的文件夹和元数据。之后它没有任何数据。

RocksDB 被配置为未恢复的后端。我们在 Kubernetes 集群上部署我们的应用程序。我相信 RockDB 是和 Flink 一起打包的,除了指定 Flink 配置外,我们真的不需要做任何事情。

向更广泛的受众投放,以便专门在 K8s 环境中遇到相同问题的人可以分享他们的经验

Flink Config:
    high-availability: zookeeper
    high-availability.cluster-id: /flink
    high-availability.zookeeper.path.root: flink

    high-availability.storageDir: s3://bucket1/flink/scl-flink-cluster/jobmanager
    high-availability.jobmanager.port: 50010

    high-availability.zookeeper.client.acl: open
    high-availability.zookeeper.quorum: scl-flink-cluster-zk-client:2181
    zookeeper.sasl.disable: true



    state.checkpoints.dir: s3://bucket1/flink/scl-flink-cluster/checkpoints
    state.savepoints.dir: s3://bucket1/flink/scl-flink-cluster/savepoints
    state.backend.async: true
    state.backend.fs.memory-threshold: 1024
    state.backend.fs.write-buffer-size: 4096
    state.backend.incremental: false
    state.backend.local-recovery: false
    state.checkpoints.num-retained: 1

    taskmanager.numberOfTaskSlots: 16
    
    state.backend: rocksdb
    state.backend.async: true
    state.backend.fs.memory-threshold: 1024
    state.backend.fs.write-buffer-size: 4096
    state.backend.incremental: true
    state.checkpoints.num-retained: 1
    fs.hdfs.hadoopconf: /etc/hadoop/conf
2022-01-18 04:18:34,366 INFO  org.apache.flink.runtime.taskmanager.Task                     - feedback-union -> functions -> (Sink: adas-walle-output-egress-egress, Sink: adas-derivation-adas-metrics-sink-egress) (31/32) (59642ed0ee77c43f38a3e94a38438158) switched from CANCELING to CANCELED.
    2022-01-18 04:18:34,379 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for feedback-union -> functions -> (Sink: adas-walle-output-egress-egress, Sink: adas-derivation-adas-metrics-sink-egress) (31/32) (59642ed0ee77c43f38a3e94a38438158).
    2022-01-18 04:18:34,379 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task feedback-union -> functions -> (Sink: adas-walle-output-egress-egress, Sink: adas-derivation-adas-metrics-sink-egress) (31/32) (59642ed0ee77c43f38a3e94a38438158) [CANCELED]
    2022-01-18 04:18:34,382 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state CANCELED to JobManager for task feedback-union -> functions -> (Sink: adas-walle-output-egress-egress, Sink: adas-derivation-adas-metrics-sink-egress) (31/32) 59642ed0ee77c43f38a3e94a38438158.
    2022-01-18 04:18:34,703 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  - Caught unexpected exception.
    java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
        at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
        at java.nio.channels.Channels.writeFully(Channels.java:101)
        at java.nio.channels.Channels.access$000(Channels.java:61)
        at java.nio.channels.Channels$1.write(Channels.java:174)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
        at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
        at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
        at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreDBInstanceFromStateHandle(RocksDBIncrementalRestoreOperation.java:409)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:296)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:152)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
    2022-01-18 04:18:34,704 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Exception while restoring keyed state backend for FunctionGroupOperator_6b87a4870d0e21cecbbe271bd893cfcc_(32/32) from alternative (1/1), will retry while more alternatives are available.
    org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
        at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
        at java.nio.channels.Channels.writeFully(Channels.java:101)
        at java.nio.channels.Channels.access$000(Channels.java:61)
        at java.nio.channels.Channels$1.write(Channels.java:174)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
        at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
        at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
        at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
        at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreDBInstanceFromStateHandle(RocksDBIncrementalRestoreOperation.java:409)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:296)
        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:152)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
4

0 回答 0