我正在使用 Flume 1.4.0,并且试图以某种方式停止 Flume 的组件:
- 首先,停止来源。
- 然后等到通道内的所有事件都被接收器消耗掉。
- 消费完所有事件后,停止通道和接收器。
上述任务是由一个关闭钩子执行的,就像在中创建的一样org.apache.flume.node.Application
(实际上,我正在开发一个自定义的Application
)。
我获得对源、通道和接收器的引用的方式是:
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
关键是我得到了这个NullPointerException
:
2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)
HTTPSource.java:165
是关于停止 Jetty 服务器实现源的 Http 服务器部分,这似乎是空的:
162 @Override
163 public void stop() {
164 try {
165 srv.stop();
166 srv.join();
167 srv = null;
168 } catch (Exception ex) {
169 LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170 }
171 sourceCounter.stop();
172 LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173 }
为什么它是空的?源工作正常,并且能够接收 Http 请求。
我猜这不是关闭 Flume 组件的正确方法……如果不是,那是什么?
谢谢!