我正在尝试测试一个简单的 Apache Beam 代码,其源代码为 Elasticsearch。我从git repo中找到了 ElasticsearchIO 源类。
我修改了 Beam 的 MinimalWordCount 示例,将源包含为 Elasticsearch 而不是 TextIO。下面是要点,
String[] hosts = new String[1];
hosts[0]="http://localhost:9200";
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(
ElasticsearchIO.read().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(hosts, "test_index", "users").withUsername("esuser").withPassword("password")
)
)
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}));
p.run().waitUntilFinish();
如果我运行代码,
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount \
-Pdirect-runner
我收到错误
执行 Java 类时发生异常。null:InvocationTargetException:java.io.IOException:等待[10000]毫秒后的侦听器超时
我调试了ElasticsearchIO.java并且可以看到一切正常,Elasicsearch 客户端已构建并且代码正在检索索引中的数据。但是读取转换后的 ParDo 函数根本不会执行。Elasticsearch 客户端一直在等待,最后得到一个超时错误。
我知道 Beam 的 Elasicsearch 连接器仍在开发中。但是任何人都可以帮助找出我做错了什么吗?
PS:我在本地运行 Elasticsearch 5.2.1。