我是 Flink 的新手,并且已经浏览了站点/示例/博客以开始使用。我正在努力正确使用运算符。基本上我有2个问题
问题一:Flink 是否支持声明式异常处理,我需要处理 parse/validate/... 错误?
- 我可以使用 org.apache.flink.runtime.operators.sort.ExceptionHandler 或类似的东西来处理错误吗?
- 还是 Rich/FlatMap 功能我最好的选择?如果 Rich/FlatMap 是唯一的选择,那么有没有办法在 Rich/FlatMap 函数中处理 Stream 以便可以附加 Sink 以进行错误处理?
问题 2:我可以有条件地附加不同的接收器吗?
- 基于键控拆分流中的某些字段,我需要选择不同的接收器,我是再次拆分流还是使用 Rich/FlatMap 来处理?
我正在使用 Flink 1.3.2。这是我工作的相关部分
.....
.....
DataStream<String> eventTextStream = env.addSource(messageSource)
KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
// parse, transform or enrich
.flatMap(new MyParseTransformEnrichFunction())
.assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
.keyBy("eventId");
// split stream based on eventType as different reduce and windowing functions need to be applied
SplitStream<EventPojo> splitStream = eventPojoStream
.split(new EventStreamSplitFunction());
// need to apply reduce function
DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");
// need to apply reduce function
DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");
// need to apply time based windowing function
DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");
....
....
env.execute("Event Processing");
我在这里使用正确的运算符吗?
更新1:
尝试使用@alpinegizmo 建议的 ProcessFunction 但这不起作用,因为它取决于我在解析/验证输入之前没有的键控流。我得到“InvalidProgramException:对于非复合类型,字段表达式必须等于'*'或'_'。”。
这是一个非常常见的用例,您的第一个解析/验证输入并且还没有键控流,那么您如何解决它?
感谢您的耐心和帮助。