0

我正在尝试在 KDA 上使用 Flink v1.11 应用程序执行流-流连接。加入 wrtProcessingTime工作,但EventTime我没有看到 Flink 的任何输出记录。

这是我的 EventTime 处理代码,它不起作用,

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Trade>forMonotonousTimestamps()
                            .withTimestampAssigner(((event, l) -> event.getEventTime()))
            );
    DataStream<Company> input2 = createSourceFromInputStreamName2(env)
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Company>forMonotonousTimestamps()
                            .withTimestampAssigner(((event, l) -> event.getEventTime()))
            );
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingEventTimeWindows.of(Time.seconds(30)))
            .apply(new JoinFunction<Trade, Company, String>() {
                @Override
                public String join(Trade t, Company c) {
                    return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

我有一个与 ProcessingTime 合作的类似加入

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    DataStream<Trade> input1 = createSourceFromInputStreamName1(env);
    DataStream<Company> input2 = createSourceFromInputStreamName2(env);
    DataStream<String> joinedStream = input1.join(input2)
            .where(new TradeKeySelector())
            .equalTo(new CompanyKeySelector())
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(10000)))
            .apply (new JoinFunction<Trade, Company, String> (){
                @Override
                public String join(Trade t, Company c) {
                    return t.getEventTime() + ", " + t.getTicker() + ", " + c.getName() + ", " + t.getPrice();
                }
            });
    joinedStream.addSink(createS3SinkFromStaticConfig());
    env.execute("Flink S3 Streaming Sink Job");
}

我尝试加入的两个流中的示例记录:

{'eventTime': 1611773705, 'ticker': 'TBV', 'price': 71.5}
{'eventTime': 1611773705, 'ticker': 'TBV', 'name': 'The Bavaria'}
4

1 回答 1

0

我没有看到任何明显错误,但以下任何一项都可能导致此作业不产生任何输出:

  • 水印问题。例如,如果其中一个流变得空闲,则水印将停止前进。或者如果窗口之后没有事件,则水印将不会前进到足以关闭该窗口。或者,如果时间戳实际上不是按升序排列(使用该forMonotonousTimestamps策略,事件应该按时间戳排序),管道可能会默默地丢弃所有乱序事件。
  • StreamingFileSink 仅在检查点期间完成其输出,并且在作业停止时不会完成任何待处理的文件。
  • 窗口连接的行为类似于内部连接,并且需要来自每个输入流的至少一个事件才能在给定的窗口间隔内产生任何结果。从您分享的示例来看,这似乎不是问题所在。

更新:

鉴于您(似乎)想要做的是将每个交易与交易时可用的最新公司记录连接起来,查找连接或临时表连接似乎是不错的方法。

这里有几个例子:

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/04/04_lookup_joins.md

https://github.com/ververica/flink-sql-cookbook/blob/master/joins/03/03_kafka_join.md

一些文档:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#event-time-temporal-join

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/versioned_tables.html

于 2021-01-29T13:04:13.330 回答