我正在尝试在 KDA 上使用 Flink v1.11 应用程序执行流-流连接。加入 wrtProcessingTime
工作,但EventTime
我没有看到 Flink 的任何输出记录。
这是我的 EventTime 处理代码,它不起作用,
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Trade> input1 = createSourceFromInputStreamName1(env)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Trade>forMonotonousTimestamps()
.withTimestampAssigner(((event, l) -> event.getEventTime()))
);
DataStream<Company> input2 = createSourceFromInputStreamName2(env)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Company>forMonotonousTimestamps()
.withTimestampAssigner(((event, l) -> event.getEventTime()))
);
DataStream<String> joinedStream = input1.join(input2)
.where(new TradeKeySelector())
.equalTo(new CompanyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new JoinFunction<Trade, Company, String>() {
@Override
public String join(Trade t, Company c) {
return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
}
});
joinedStream.addSink(createS3SinkFromStaticConfig());
env.execute("Flink S3 Streaming Sink Job");
}
我有一个与 ProcessingTime 合作的类似加入
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Trade> input1 = createSourceFromInputStreamName1(env);
DataStream<Company> input2 = createSourceFromInputStreamName2(env);
DataStream<String> joinedStream = input1.join(input2)
.where(new TradeKeySelector())
.equalTo(new CompanyKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10000)))
.apply (new JoinFunction<Trade, Company, String> (){
@Override
public String join(Trade t, Company c) {
return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
}
});
joinedStream.addSink(createS3SinkFromStaticConfig());
env.execute("Flink S3 Streaming Sink Job");
}
我尝试加入的两个流中的示例记录:
{'eventTime': 1611773705, 'ticker': 'TBV', 'price': 71.5}
{'eventTime': 1611773705, 'ticker': 'TBV', 'name': 'The Bavaria'}