我有从 Cassandra 读取数据、处理/转换/过滤数据并将结果写入 Elasticsearch 的 spark 作业。我使用 docker 进行集成测试,但在从 spark 到 Elasticsearch 的编写过程中遇到了麻烦。
依赖项:
"joda-time" % "joda-time" % "2.9.4",
"javax.servlet" % "javax.servlet-api" % "3.1.0",
"org.elasticsearch" % "elasticsearch" % "2.3.2",
"org.scalatest" %% "scalatest" % "2.2.1",
"com.github.nscala-time" %% "nscala-time" % "2.10.0",
"cascading" % "cascading-hadoop" % "2.6.3",
"cascading" % "cascading-local" % "2.6.3",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5",
"org.elasticsearch" % "elasticsearch-hadoop" % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),
"org.apache.spark" %% "spark-catalyst" % "1.4.0" % "provided"
在我的单元测试中,我可以使用 TransportClient 连接到 elasticsearch 来设置我的模板和索引
又名。这有效
val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
.set("spark.cassandra.input.split.size_in_mb", "67108864")
.set("spark.cassandra.connection.host", cassandraHostString)
.set("es.nodes", elasticsearchHostString)
.set("es.port", "9200")
.set("http.publish_host", "")
sc = new SparkContext(conf)
esClient = TransportClient.builder().build()
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString), 9300))
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet()
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet()
esClient.admin().indices().prepareAliases().addAlias(esTestIndex, "hot").execute().actionGet()
但是,当我尝试运行时
EsSpark.saveToEs(
myRDD,
"hot/mytype",
Map("es.mapping.id" -> "id", "es.mapping.parent" -> "parent_id")
)
我收到这个堆栈跟踪
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我可以使用 'docker network inspect bridge 验证它正在尝试连接到正确的 IP 地址。
docker network inspect bridge
[
{
"Name": "bridge",
"Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": [
{
"Subnet": "172.17.0.0/16",
"Gateway": "172.17.0.1"
}
]
},
"Internal": false,
"Containers": {
"0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": {
"Name": "analytics_rabbitmq_1",
"EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4",
"MacAddress": "02:42:ac:11:00:04",
"IPv4Address": "172.17.0.4/16",
"IPv6Address": ""
},
"9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": {
"Name": "analytics_elasticsearch_1",
"EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56",
"MacAddress": "02:42:ac:11:00:02",
"IPv4Address": "172.17.0.2/16",
"IPv6Address": ""
},
"ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": {
"Name": "analytics_cassandra_1",
"EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6",
"MacAddress": "02:42:ac:11:00:03",
"IPv4Address": "172.17.0.3/16",
"IPv6Address": ""
}
},
"Options": {
"com.docker.network.bridge.default_bridge": "true",
"com.docker.network.bridge.enable_icc": "true",
"com.docker.network.bridge.enable_ip_masquerade": "true",
"com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
"com.docker.network.bridge.name": "docker0",
"com.docker.network.driver.mtu": "1500"
},
"Labels": {}
}
]
我在 macbook/osx 上本地运行所有内容。我不知道为什么我可以使用 TransportClient 并通过我的浏览器连接到 docker 容器,但是函数 EsSpark.saveToES(...) 总是失败。