是的,水槽不能将事件拆分为多个。这是我对这种方法的替代解决方案,以 Kafka 源为例。
首先实现一个扩展Kafka源的源类,替换默认的ChannelProcessor对象。
public class XXXSplitSource extends KafkaSource {
@Override
public synchronized ChannelProcessor getChannelProcessor()
{
return new XXXYourChannelProcessorProxy(super.getChannelProcessor());
}
}
然后,在 ChannelProcessor 代理实现中,您可以使用自定义函数拆分事件。
public class XXXYourChannelProcessorProxy extends ChannelProcessor {
public ChannelProcessor m_downstreamChannelProcessor = null;
public XXXYourChannelProcessorProxy (ChannelSelector selector) {
super(selector);
}
public XXXYourChannelProcessorProxy (ChannelProcessor processor) {
super(null);
m_downstreamChannelProcessor = processor;
}
@Override
public void processEventBatch(List<Event> events) {
List<Event> generatedEvents = YOUR_SPLIT_FUNCTION_HERE(events);
m_downstreamChannelProcessor.processEventBatch(generatedEvents);
}
}