我对 Apache Flink Streaming API 有疑问。
我可以设法使用自定义数据源设置整个 CEP 环境,并且在该源上使用标准接收器(如“print()”)时,一切正常。
这是我的水槽现在的样子:
@RequiredArgsConstructor
public class EventDataConsumer extends RichSinkFunction<EventData>{
private final transient Consumer<EventData> consumer;
@Override
public void invoke(EventData eventData) throws Exception {
consumer.accept(eventData);
}
}
我试图实现的目标是将方法引用传递给这个 SinkFunction,它将为我的 DataStream 中的每个元素执行。
这就是我初始化 SinkFunction 的方式:
EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData);
outStream.addSink(consumer);
我的问题是,当我在自定义接收器的“调用”方法中设置断点时,即使我显式调用构造函数来分配消费者,消费者也似乎为空。