3

我想ProcessWindowFunction在我的 Apache Flink 项目中使用 a 。但是我在使用过程功能时遇到了一些错误,请参见下面的代码片段

错误是:

WindowedStream,Tuple,TimeWindow> 类型中的方法 process(ProcessWindowFunction,R,Tuple,TimeWindow>) 不适用于参数 (JDBCExample.MyProcessWindows)

我的程序:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

我的ProcessWindowFunction

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}
4

2 回答 2

5

问题可能是ProcessWindowFunction.

keyBy(0)您正在按位置 ( )引用密钥。因此,编译器无法推断其类型 ( String),您需要将其更改ProcessWindowFunction为:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

通过替换StringTuple您现在有了一个通用的键占位符,Tuple1<String>当您需要访问processElement()方法中的键时,您可以将其转换为:

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

KeySelector<IN, KEY>如果您定义一个函数来提取密钥,您可以避免强制转换并使用正确的类型,因为编译器知道返回类型KEYKeySelector

于 2018-03-20T07:28:56.313 回答
3

Fabian 所说的 :) 使用Tuple应该可以,但在你的ProcessWindowFunction. 使用 aKeySelector很容易,而且代码更简洁。例如

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {

    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

然后上面让你定义一个ProcessWindowFunctionlike:

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {
于 2018-03-22T03:14:07.883 回答