我正在阅读 2 个流。一个带有记录,一个带有元数据。
我第一次希望我的应用程序通过扫描完整的表来构建元数据并将其保存到 Flink 的 MapState。表上的更新将通过元数据流捕获,并且 MapState 将相应地更新。
从第二次开始,我想使用 MapState 而不是读取整个表。
下面是我对这个功能的实现,但是我的 MapState 总是空的,我在这里做错了吗?
public class CustomCoFlatMap extends RichCoFlatMapFunction<Record, Metadata, Output> {
private transient DataSource datasource;
private transient MapState<String, Metadata> metadataState;
@Inject
public void setDataSource(DataSource datasource) {
this.datasource = datasource;
}
@Override
public void open(Configuration parameters) throws Exception {
final RichFunctionComponent component = DaggerRichFunctionComponent.builder()
.richFunctionModule(RichFunctionModule.builder()
.runtimeContext(getRuntimeContext())
.build())
.build();
component.inject(this);
// read MapState from snapshot
metadataState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Cluster>("metadataState",
TypeInformation.of(new TypeHint<String>(){}), TypeInformation.of(new TypeHint<Metadata>() {})));
}
@Override
public void flatMap2(Metadata metadata, Collector<Output> collector) throws Exception {
// this should happen only when application starts for first time
// from next time, application will read from snapshot
readMetadataForFirstTime();
// update metadata in MapState
this.metadataState.put(metadata.getId(), metadata);
}
@Override
public void flatMap1(Record record, Collector<Output> collector) throws Exception {
readMetadataForFirstTime();
Metadata metadata = this.metadataState.get(record.getId());
Output output = new Output(record.getId(), metadataState.getName(), metadataState.getVersion(), metadata.getType());
collector.collect(output);
}
private void readMetadataForFirstTime() throws Exception {
if(this.metadataState.iterator().hasNext()) {
// metadataState from snapshot has data
// not reading from table
return;
}
// do this only once
// read metadata from table and add it to MapState
List<Metadata> metadataList = datasource.listAllMetadata();
for(Metadata metadata: metadataList) {
this.metadataState.put(metadata.getid(), metadata);
}
}
}
编辑:其余的应用程序
DataStream<Metadata> metadataKeyedStream =
env.addSource(metadataStream)
.keyBy(Metadata::getId);
SingleOutputStreamOperator<Output> outputStream =
env.addSource(recordStream)
.assignTimestampsAndWatermarks(new RecordTimeExtractor())
.keyBy(Record::getId)
.connect(metadataKeyedStream)
.flatMap(new CustomCoFlatMap());