我正在从Flink Docs测试以下内容
可以删除字段。删除后,已删除字段的先前值将在以后的检查点和保存点中删除。POJO类
我遵循的步骤:
- 按照给定的规则定义 POJO 类, 例如
@Data
@NoArgsConstructor
public class SomePojo {
@NonNull String field1;
@NonNull String field2;
@NonNull String field3;
public static TypeInformation<SomePojo > getProducedType() {
return TypeInformation.of(SomePojo .class);
}
}
- 将 Window 运算符和其中的 MapState 定义为
private transient MapState<String, SomePojo> mapState; // declared class level field
MapStateDescriptor<String, SomePojo> mapStateDescriptor =
new MapStateDescriptor<>("state",Types.String, SomePojo.getProducedType()); // defined descriptor
- 在 KDA 中部署应用程序,现在通过某个键将 SomePojo 对象的值存储在 MapState 中
mapstate.put("someString", objectOfSomePojoClass);
- 更改 SomePojo 以删除第三个字段
@Data
@NoArgsConstructor
public class SomePojo {
@NonNull String field1;
@NonNull String field2;
// @NonNull String field3; Commented out 3rd field to check schema evolution
public static TypeInformation<SomePojo > getProducedType() {
return TypeInformation.of(SomePojo .class);
}
}
- 重复第 3 步。但是当 flink 启动它的抛出异常如下
2021-06-10 19:22:40
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:474)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:470)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:529)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_8a1a38027b51a601bd831c52e46e25fa_(1/2) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
at org.apache.flink.api.common.typeutils.CompositeSerializer$PrecomputedParameters.precompute(CompositeSerializer.java:228)
at org.apache.flink.api.common.typeutils.CompositeSerializer.<init>(CompositeSerializer.java:51)
at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer.<init>(TtlStateFactory.java:250)
at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:359)
at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:330)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:137)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:71)
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:153)
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:136)
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.getOrRegisterStateColumnFamilyHandle(AbstractRocksDBRestoreOperation.java:161)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:191)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 15 more
标注
- 我已经测试过添加新字段可以正常工作,并且还设置了默认值。
- 我使用的运行时是 Flink-1_11
- 我已经在https://issues.apache.org/jira/browse/FLINK-11947看到 mapstate 的模式演变已经修复