5

我有从 Cassandra 读取数据、处理/转换/过滤数据并将结果写入 Elasticsearch 的 spark 作业。我使用 docker 进行集成测试,但在从 spark 到 Elasticsearch 的编写过程中遇到了麻烦。

依赖项:

"joda-time"              % "joda-time"          % "2.9.4",
"javax.servlet"          %  "javax.servlet-api" % "3.1.0",
"org.elasticsearch"      %  "elasticsearch"     % "2.3.2",
"org.scalatest"          %% "scalatest"         % "2.2.1",
"com.github.nscala-time" %% "nscala-time"       % "2.10.0",
"cascading"              %   "cascading-hadoop" % "2.6.3",
"cascading"              %   "cascading-local"  % "2.6.3",
"com.datastax.spark"     %% "spark-cassandra-connector" % "1.4.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5",
"org.elasticsearch"      %  "elasticsearch-hadoop"      % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),
"org.apache.spark"       %% "spark-catalyst"            % "1.4.0" % "provided"

在我的单元测试中,我可以使用 TransportClient 连接到 elasticsearch 来设置我的模板和索引

又名。这有效

val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
  .set("spark.cassandra.input.split.size_in_mb", "67108864")
  .set("spark.cassandra.connection.host", cassandraHostString)
  .set("es.nodes", elasticsearchHostString)
  .set("es.port", "9200")
  .set("http.publish_host", "")
sc = new SparkContext(conf)
esClient = TransportClient.builder().build()
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString), 9300))
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet()
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet()
esClient.admin().indices().prepareAliases().addAlias(esTestIndex, "hot").execute().actionGet()

但是,当我尝试运行时

EsSpark.saveToEs(
  myRDD,
  "hot/mytype",
  Map("es.mapping.id" -> "id", "es.mapping.parent" -> "parent_id")
)

我收到这个堆栈跟踪

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我可以使用 'docker network inspect bridge 验证它正在尝试连接到正确的 IP 地址。

docker network inspect bridge
[
{
    "Name": "bridge",
    "Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1",
    "Scope": "local",
    "Driver": "bridge",
    "EnableIPv6": false,
    "IPAM": {
        "Driver": "default",
        "Options": null,
        "Config": [
            {
                "Subnet": "172.17.0.0/16",
                "Gateway": "172.17.0.1"
            }
        ]
    },
    "Internal": false,
    "Containers": {
        "0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": {
            "Name": "analytics_rabbitmq_1",
            "EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4",
            "MacAddress": "02:42:ac:11:00:04",
            "IPv4Address": "172.17.0.4/16",
            "IPv6Address": ""
        },
        "9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": {
            "Name": "analytics_elasticsearch_1",
            "EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56",
            "MacAddress": "02:42:ac:11:00:02",
            "IPv4Address": "172.17.0.2/16",
            "IPv6Address": ""
        },
        "ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": {
            "Name": "analytics_cassandra_1",
            "EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6",
            "MacAddress": "02:42:ac:11:00:03",
            "IPv4Address": "172.17.0.3/16",
            "IPv6Address": ""
        }
    },
    "Options": {
        "com.docker.network.bridge.default_bridge": "true",
        "com.docker.network.bridge.enable_icc": "true",
        "com.docker.network.bridge.enable_ip_masquerade": "true",
        "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
        "com.docker.network.bridge.name": "docker0",
        "com.docker.network.driver.mtu": "1500"
    },
    "Labels": {}
}
]

我在 macbook/osx 上本地运行所有内容。我不知道为什么我可以使用 TransportClient 并通过我的浏览器连接到 docker 容器,但是函数 EsSpark.saveToES(...) 总是失败。

4

1 回答 1

2

通过设置

.config("es.nodes.wan.only", "true")

可以解决这个问题

es.nodes.ingest.only

(默认 false)是否仅使用 Elasticsearch 摄取节点。启用后,elasticsearch-hadoop 将通过集群中的摄取节点路由其所有请求(在节点发现之后,如果启用)。此配置设置的目的是避免产生从非摄取节点转发用于管道的数据的成本;仅在将数据写入 Ingest Pipeline 时才真正有用(请参阅上面的 es.ingest.pipeline)。

于 2018-02-01T10:27:34.173 回答