2

我正在使用 Flume 1.4.0,并且试图以某种方式停止 Flume 的组件:

  • 首先,停止来源。
  • 然后等到通道内的所有事件都被接收器消耗掉。
  • 消费完所有事件后,停止通道和接收器。

上述任务是由一个关闭钩子执行的,就像在中创建的一样org.apache.flume.node.Application(实际上,我正在开发一个自定义的Application)。

我获得对源、通道和接收器的引用的方式是:

MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();

关键是我得到了这个NullPointerException

2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
    at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
    at     org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
    at     es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
    at     es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)

HTTPSource.java:165是关于停止 Jetty 服务器实现源的 Http 服务器部分,这似乎是空的:

162  @Override
163  public void stop() {
164    try {
165      srv.stop();
166      srv.join();
167      srv = null;
168    } catch (Exception ex) {
169      LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170    }
171    sourceCounter.stop();
172    LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173  }

为什么它是空的?源工作正常,并且能够接收 Http 请求。

我猜这不是关闭 Flume 组件的正确方法……如果不是,那是什么?

谢谢!

4

2 回答 2

1

原因是 srv 被多个线程共享(因此它是 volatile 声明)。Flume 尝试调用 close 来终止源,并且这种情况不止一次发生。对 stop() 的第二次调用失败,因为 srv 已被取消。

在您的情况下发生这种情况而不是在标准香草水槽代理中发生这种情况的原因可能是因为您尚未更新 SourceCounter。查看 MonitoredCounterGroup 了解详细信息。

于 2015-03-03T19:49:03.123 回答
1

固定的。感谢 Erik 的指点,我调试了代码,直到我意识到每次configurationProvider.getConfiguration()调用一个句子时MaterializedConfiguration都会创建一个新句子。这样的物化配置是一整套正在运行的源、通道和接收器。因此,我有几个相同来源的副本……哎呀!然而,不知何故,Flume 足够聪明,可以检测到配置的多重实现,我已经看到它关闭了所有重复的组件……但这包括 Jetty 服务器的 volatile 变量等等。

因此,而不是这样做:

MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();

现在我得到了我需要的参考资料handleConfigurationEvent(MaterializedConfiguration conf)(它被覆盖了):

@Override
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    sourcesRef = conf.getSourceRunners();
    channelsRef = conf.getChannels();
    sinksRef = conf.getSinkRunners();
    super.handleConfigurationEvent(conf);
} // handleConfigurationEvent

再次感谢埃里克!

于 2015-03-04T14:24:08.043 回答