有点卡住了CoFlatMapFunction
。如果我将它放在DataStream
前面的窗口上似乎可以正常工作,但如果放在窗口的“应用”功能之后会失败。
我正在测试两个流,主要“功能”用于flatMap1
不断摄取数据,控制流“模型”用于flatMap2
根据请求更改模型。
我能够设置并看到 b0/b1 正确设置flatMap2
,但flatMap1
总是看到 b0 和 b1 在初始化时设置为 0。
我在这里遗漏了一些明显的东西吗?
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
private static final long serialVersionUID = 1L;
Double b0;
Double b1;
public applyModel(){
b0=0.0;
b1=0.0;
}
@Override
public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
System.out.print("Main: " + this + "\n");
}
@Override
public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
System.out.print("Old Model: " + this + "\n");
b0 = value.getB0();
b1 = value.getB1();
System.out.print("New Model: " + this + "\n");
}
@Override
public String toString(){
return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
}
}