1

我正在尝试测试一个简单的 Apache Beam 代码,其源代码为 Elasticsearch。我从git repo中找到了 ElasticsearchIO 源类。

我修改了 Beam 的 MinimalWordCount 示例,将源包含为 Elasticsearch 而不是 TextIO。下面是要点,

String[] hosts = new String[1];
hosts[0]="http://localhost:9200";
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(
      ElasticsearchIO.read().withConnectionConfiguration(
        ElasticsearchIO.ConnectionConfiguration.create(hosts, "test_index", "users").withUsername("esuser").withPassword("password")
      )
 )
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
         for (String word : c.element().split("[^a-zA-Z']+")) { 
           if (!word.isEmpty()) {
             c.output(word);
           }
         }
      }
}));
p.run().waitUntilFinish();

如果我运行代码,

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount \
 -Pdirect-runner

我收到错误

执行 Java 类时发生异常。null:InvocationTargetException:java.io.IOException:等待[10000]毫秒后的侦听器超时

我调试了ElasticsearchIO.java并且可以看到一切正常,Elasicsearch 客户端已构建并且代码正在检索索引中的数据。但是读取转换后的 ParDo 函数根本不会执行。Elasticsearch 客户端一直在等待,最后得到一个超时错误。

我知道 Beam 的 Elasicsearch 连接器仍在开发中。但是任何人都可以帮助找出我做错了什么吗?

PS:我在本地运行 Elasticsearch 5.2.1。

4

5 回答 5

2

当前版本的 ElasticsearchIO (beam-0.6.0 2017/03) 尚不支持 Elasticsearch 版本 5。这方面的进展由https://issues.apache.org/jira/browse/BEAM-1637跟踪

于 2017-03-11T06:00:49.133 回答
1

超时问题确实是由于当前的 Elasticsearch IO 与 ES v5.x 不兼容。但是不清除滚动在 ES 方面是有代价的:段合并过程(从较小的段中创建更大的段并删除较小的段)被保留,因为 ES 在滚动上下文正在使用旧段时无法删除它们。

此外,另一件事:IO.read 将文档返回为 Json,因此拆分 ParDo 也可能在字段名称上拆分,而不仅仅是字段值。

于 2017-03-13T10:40:57.473 回答
0

直接覆盖收到的异常的问题,标题为“从 ElasticsearchIO 等待 [10000] 毫秒后获取侦听器超时”,最近已关闭并显示消息:“已通过添加 ES 5.x 支持来解决(https://github.com /apache/beam/pull/3703)”。

错误现在仍然表现出来吗?

于 2017-09-27T17:21:02.197 回答
0

你可以运行 mvn -X 来获得详细的描述吗?我在弹性论坛上找到了这个 https://discuss.elastic.co/t/es5-indexing-performance-seems-slow/65084/22 https://discuss.elastic.co/t/es5-correct-restclient-失败监听器行为重试逻辑/68211/3

您可能应该通过超时进行更多调查

于 2017-03-10T16:03:48.943 回答
0

问题在于执行删除滚动的代码。如果我在 ElasticsearchIO.java 中注释该代码,则管道运行良好。

restClient.performRequest(
             "DELETE",
             "/_search/scroll",
             Collections.<String, String>emptyMap(),
             entity,
             new BasicHeader("", ""));
于 2017-03-11T03:20:47.440 回答