0

我正在阅读 2 个流。一个带有记录,一个带有元数据。

我第一次希望我的应用程序通过扫描完整的表来构建元数据并将其保存到 Flink 的 MapState。表上的更新将通过元数据流捕获,并且 MapState 将相应地更新。

从第二次开始,我想使用 MapState 而不是读取整个表。

下面是我对这个功能的实现,但是我的 MapState 总是空的,我在这里做错了吗?

public class CustomCoFlatMap extends RichCoFlatMapFunction<Record, Metadata, Output> {

    private transient DataSource datasource;
    private transient MapState<String, Metadata> metadataState;

    @Inject
    public void setDataSource(DataSource datasource) {
        this.datasource = datasource;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        final RichFunctionComponent component = DaggerRichFunctionComponent.builder()
                .richFunctionModule(RichFunctionModule.builder()
                        .runtimeContext(getRuntimeContext())
                        .build())
                .build();
        component.inject(this);

        // read MapState from snapshot
        metadataState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Cluster>("metadataState",
                TypeInformation.of(new TypeHint<String>(){}), TypeInformation.of(new TypeHint<Metadata>() {})));
    }

    @Override
    public void flatMap2(Metadata metadata, Collector<Output> collector) throws Exception {
        // this should happen only when application starts for first time
        // from next time, application will read from snapshot
        readMetadataForFirstTime();

        // update metadata in MapState
        this.metadataState.put(metadata.getId(), metadata);
    }

    @Override
    public void flatMap1(Record record, Collector<Output> collector) throws Exception {
        
        readMetadataForFirstTime();
        
        Metadata metadata = this.metadataState.get(record.getId());

        Output output = new Output(record.getId(), metadataState.getName(), metadataState.getVersion(), metadata.getType());
    
        collector.collect(output);
    }

    private void readMetadataForFirstTime() throws Exception {
        if(this.metadataState.iterator().hasNext()) {
            // metadataState from snapshot has data
            // not reading from table
            return;
        }

        // do this only once
        // read metadata from table and add it to MapState
        List<Metadata> metadataList = datasource.listAllMetadata();
        for(Metadata metadata: metadataList) {
            this.metadataState.put(metadata.getid(), metadata);
        }

    }
}

编辑:其余的应用程序

DataStream<Metadata> metadataKeyedStream =
                env.addSource(metadataStream)
                        .keyBy(Metadata::getId);

SingleOutputStreamOperator<Output> outputStream =
                env.addSource(recordStream)
                        .assignTimestampsAndWatermarks(new RecordTimeExtractor())
                        .keyBy(Record::getId)
                        .connect(metadataKeyedStream)
                        .flatMap(new CustomCoFlatMap());
4

1 回答 1

1

MapState是一种键分区状态——这意味着 Flink 为Map<String, Metadata>输入流中的每个不同键维护一个单独的状态。readMetadataForFirstTime必须为您正在处理的流中的每个键读取并插入其数据,因为每个键RichCoFlatMapFunction都有一个单独的映射。

您可能希望以不同的方式处理此问题,具体取决于您要执行的操作。例如,如果您只想为源流中的每个键存储一个值,那么您应该使用ValueState而不是MapState. 您可以将其ValueState视为分片键/值存储,其中有状态运算符(例如 a RichCoFlatMapFunction)的每个并行实例都将具有键空间切片的值。MapState适用于您需要为每个键而不是单个对象存储整个哈希图的情况。

(如果我误判了问题所在,请分享更多上下文以显示应用程序的其余部分如何使用它RichCoFlatMapFunction。)

于 2020-11-21T11:11:56.160 回答