11

有点卡住了CoFlatMapFunction。如果我将它放在DataStream前面的窗口上似乎可以正常工作,但如果放在窗口的“应用”功能之后会失败。

我正在测试两个流,主要“功能”用于flatMap1不断摄取数据,控制流“模型”用于flatMap2根据请求更改模型。

我能够设置并看到 b0/b1 正确设置flatMap2,但flatMap1总是看到 b0 和 b1 在初始化时设置为 0。

我在这里遗漏了一些明显的东西吗?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}
4

1 回答 1

4

这是邮件列表中的答案...

CoFlatMapFunction 是否打算并行执行?

如果是,您需要某种方法来确定性地分配哪个记录到哪个并行实例。在某种程度上,CoFlatMapFunction 在模型和会话窗口的结果之间进行并行(分区)连接,因此您需要某种形式的键来选择元素进入哪个分区。那有意义吗?

如果不是,请尝试将其显式设置为并行度 1。

问候,斯蒂芬


所有人都可以只读访问的全局状态可以通过广播()实现。

可供所有人读取和更新的全局状态当前不可用。对此的一致操作将非常昂贵,需要某种形式的分布式通信/共识。

相反,我鼓励您使用以下方法:

1)如果您可以对状态进行分区,请使用 keyBy().mapWithState() - 本地化状态操作并使其非常快。

2)如果你的 state 不是 key 组织的,那么你的 state 可能很小,你也许可以使用非并行操作。

3) 如果某个操作更新了状态并且另一个操作访问它,您通常可以通过迭代和 CoFlatMapFunction 来实现它(一侧是原始输入,另一侧是反馈输入)。

最后,所有方法都将状态访问和修改本地化,如果可能的话,这是一个很好的模式。

问候,斯蒂芬

于 2015-11-17T14:52:32.657 回答