0

背景:我一直在尝试在部署在 kinesis 分析运行时的同一个 flink 应用程序中设置 BATCH + STREAMING。STREAMING 部分工作正常,但我无法添加对 BATCH 的支持。

Flink:处理数据早于应用程序水印的键控流

Apache Flink:Datastream API 的批处理模式失败,异常“IllegalStateException:排序输入不允许检查点。”

逻辑是这样的:

The logic is something like this :

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
                .<DetectionEvent>forBoundedOutOfOrderness(orderness)
                .withTimestampAssigner(
                        (SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

在这样做时,我得到以下异常:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_90bea66de1c231edf33913ecd54406c1_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 10 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:306)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:426)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 12 more
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
    at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:672)
    at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:521)
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:302)
    ... 17 more

似乎 kinesis-analytics 不允许客户端定义 flink-conf.yaml 文件来定义taskmanager.memory.managed.consumer-weights. 有没有办法解决 ?

4

1 回答 1

0

我不清楚这个异常的根本原因是什么,也不清楚如何在 KDA 上进行批处理。

你可以试试这个(但我不确定 KDA 是否允许):

Configuration conf = new Configuration();
conf.setString("taskmanager.memory.managed.consumer-weights", "put-the-value-here");

StreamExecutionEnvironment env =
  StreamExecutionEnvironment.getExecutionEnvironment(conf);
于 2021-11-30T21:58:44.547 回答