1

我是 Flink 的新手,并且已经浏览了站点/示例/博客以开始使用。我正在努力正确使用运算符。基本上我有2个问题

问题一:Flink 是否支持声明式异常处理,我需要处理 parse/validate/... 错误?

  • 我可以使用 org.apache.flink.runtime.operators.sort.ExceptionHandler 或类似的东西来处理错误吗?
  • 还是 Rich/FlatMap 功能我最好的选择?如果 Rich/FlatMap 是唯一的选择,那么有没有办法在 Rich/FlatMap 函数中处理 Stream 以便可以附加 Sink 以进行错误处理?

问题 2:我可以有条件地附加不同的接收器吗?

  • 基于键控拆分流中的某些字段,我需要选择不同的接收器,我是再次拆分流还是使用 Rich/FlatMap 来处理?

我正在使用 Flink 1.3.2。这是我工作的相关部分

    .....
    .....
    DataStream<String> eventTextStream = env.addSource(messageSource)

    KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
            // parse, transform or enrich
            .flatMap(new MyParseTransformEnrichFunction())
            .assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
            .keyBy("eventId");

    // split stream based on eventType as different reduce and windowing functions need to be applied
    SplitStream<EventPojo> splitStream = eventPojoStream
            .split(new EventStreamSplitFunction());

    // need to apply reduce function
    DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");

    // need to apply reduce function
    DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");

    // need to apply time based windowing function
    DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");

    ....
    ....

    env.execute("Event Processing");      

我在这里使用正确的运算符吗?

更新1:

尝试使用@alpinegizmo 建议的 ProcessFunction 但这不起作用,因为它取决于我在解析/验证输入之前没有的键控流。我得到“InvalidProgramException:对于非复合类型,字段表达式必须等于'*'或'_'。”。

这是一个非常常见的用例,您的第一个解析/验证输入并且还没有键控流,那么您如何解决它?

感谢您的耐心和帮助。

4

1 回答 1

2

您忽略了一个关键构建块。看看侧面输出

这种机制提供了一种类型安全的方式来生成任意数量的附加输出流。这可能是一种报告错误的干净方式,以及其他用途。在 Flink 1.3 中,侧输出只能与 ProcessFunction 一起使用,但 1.4 会将侧输出添加到 ProcessWindowFunction

于 2017-10-26T09:48:51.563 回答