我想实现一个具有两个输入流并从每个流中获取一个项目以同时处理这两个输入流的运算符,例如连接。此外,如果两个输入之一没有任何数据,则操作员将阻塞并等待它。
如果我必须这样做,涉及哪些课程?关于它的教程要好得多。任何建议将不胜感激!
我想实现一个具有两个输入流并从每个流中获取一个项目以同时处理这两个输入流的运算符,例如连接。此外,如果两个输入之一没有任何数据,则操作员将阻塞并等待它。
如果我必须这样做,涉及哪些课程?关于它的教程要好得多。任何建议将不胜感激!
您需要连接两个DataStream
并应用TwoInputStreamOperator
. 已经有一堆预定义的运算符。在您的情况下, aCoFlatMapFunction
将是一个不错的选择:
DataStream input1 = ...
DataStream input2 = ...
input1.connect(input2).flatMap(new MyOwnCoFlatMapFunction());
这里有更多细节:https ://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#co-operators
但是,此运算符不能按您的意愿进行阻止。因此,您需要应用以下模式:每次收到来自左侧或右侧的输入时,如果没有来自另一侧的输入可用,则需要缓冲输入:
MyOwnCoFlatMapFunction implements CoFlatMapFunction {
List<IN> leftInput = new LinkedList<IN>();
List<IN> rightInput = new LinkedList<IN>();
void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
if(rightInput.size() > 0) {
IN right = rightInput.remove();
// process left input (value) and right input (right) together
} else {
leftInput.add(value);
}
}
// reverse pattern for flatMap2 here
}
但是,您需要注意阻塞在流处理中是危险的。如果您的输入流具有不同的日期速率,则此方法将不起作用(!),因为较慢的流会限制较快的流,从而导致较快的流产生背压。我不知道你的用例,但它似乎是“错误的”。为什么不能按时加入?