问题标签 [elasticsearch-hadoop]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 在 Spark 的 Elastic Search 中插入地理数据
我尝试在我的 ES 中上传带有纬度和经度字段的 RDD。我想使用 geo_point 类型将它们绘制在地图上。我尝试为每个文档创建一个“位置”字段,其中包含一个字符串,如“12.25,-5.2”或一个包含两个双精度数的数组,用于纬度/经度,但 ES 没有将它们检测为 geo_point。在我插入数据之前索引不存在。
我如何告诉 ES 该位置是一个地理点?
使用 elasticsearch-hadoop 库存储的当前代码:
使用myRDD一个包含“位置”的 RDD[Map] -> [double, double]
并且someConf包含"es.index.auto.create" -> "true"
hadoop - 从 hdfs 、集群设置和使用中获取 elasticsearch 中的数据
我正在建立一个火花簇。我在同一实例上有 hdfs 数据节点和 spark 主节点。
当前设置是 1-master(spark 和 hdfs)6-spark worker 和 hdfs 数据节点
所有实例都是相同的,16gig 双核(不幸的是)。
我还有 3 台机器,同样的规格。现在我有三个选择 1. 在这 3 台机器上部署 es 就可以了。集群看起来像 1-master(spark 和 hdfs) 6-spark worker 和 hdfs 数据节点 3-elasticsearch 节点
- 在 1 上部署 es master,在所有其他上扩展 spark 和 hdfs 和 es。集群看起来像 1-master(spark 和 hdfs) 1-master elasticsearch 8-spark worker、hdfs 数据节点、es 数据节点
我的应用程序大量使用 spark 进行连接、ml 等,但我们正在寻找搜索功能。搜索我们绝对不需要实时,长达 30 分钟的刷新间隔对我们来说甚至很好。
同时spark集群除了es索引之外还有其他长时间运行的任务。
解决方案不必是上述之一,如果有人建议,我愿意进行实验。一旦结束,其他开发人员也会很方便。
我也在尝试使用 es hadoop、es-spark 项目,但如果我做 3 个专用节点,我觉得摄取非常慢,就像每分钟 60 万条记录。
python - Pyspark 将 rdd 转换为具有空值的数据帧
我正在使用 pyspark (1.6) 和 elasticsearch-hadoop (5.1.1)。我通过以下方式将我的数据从 elasticsearch 转换为 rdd 格式:
这里的 es_read_conf 只是我的 ES 集群的字典,作为 sc 的 SparkContext 对象。这很好用,我得到了 rdd 对象。
我想将其转换为数据框
但我得到了错误:
给 toDF 方法一个 sampleSize 会导致同样的错误。据我了解,这是因为 pyspark 无法确定每个字段的类型。我知道我的弹性搜索集群中有一些字段都是空的。
将其转换为数据框的最佳方法是什么?
scala - 无法使用 es-hadoop 在 elasticsearch 中创建外部表
我正在运行一个简单的 spark-submit 作业,例如:
当我们在 Hive 界面中创建外部表并将数据从 hive 表加载到外部表中时,ES-hadoop 工作正常。当我们在 jar 中包含相同的查询时,它不起作用。当我们创建普通的 hive 表时,相同的 jar 文件可以正常工作。当我们在 jar 文件中包含外部表时,这里的问题显示以下错误有人可以帮我解决这个问题吗?
scala - Scala SBT elasticsearch-hadoop未解决的依赖关系
添加依赖libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "5.1.1"
和刷新项目时,我得到许多未解决的依赖(级联,org.pentaho,...)。
但是,如果我添加另一个依赖项,就像libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"
它工作一样,我可以在我的 scala 文件中使用该库。
那么,问题是来自 elasticsearch-hadoop 吗?我正在使用 SBT 0.13.13,但也尝试使用 0.13.8。
我从https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/5.1.1获取了依赖项,我知道对于某些依赖项,您还需要添加存储库(解析器 += ...),但在这里它似乎不需要回购。
apache-spark - 通过 PySpark 在 Elasticsearch 中插入数组
我有一个很像这样的案例:
示例数据框:
我想将这些数据插入到 Elasticsearch 中,所以据我研究,我必须转换为索引格式:
然后我尝试插入:
我明白了:
org.apache.spark.SparkException:无法使用 java.util.ArrayList 类型的数据
如果没有该email
字段它可以正常工作,我使用es.output.json: true
and找到了一些建议的解决方案json.dumps
,但它似乎适用于版本 5,所以我尝试在另一个集群中使用 ES v5
然后我得到:
不能使用 java.lang.String 类型的 RDD 元素
感觉坏人
python - 是否可以使用 elasticsearch-hadoop/spark 写入具有格式化日期的动态创建的 Elasticsearch 索引?
在独立的火花中,我试图从数据框写入 Elasticsearch。虽然我可以让它工作,但我不知道如何写入格式为“index_name-{ts_col:{YYYY-mm-dd}}”的动态命名索引,其中“ts_col”是数据集中的日期时间字段。
我看过各种各样的帖子说这种类型的语法应该可以工作,但是当我尝试它时,我得到了底部包含的错误。它似乎在创建索引之前首先检查索引是否存在,但它将未格式化的索引名称传递给该索引名称,而不是动态创建的索引名称。我尝试使用 python elasticsearch 模块首先使用相同的语法创建索引,但它无法处理动态索引名称。
是否有任何可用的解决方案,或者我是否必须在 spark 中遍历我的数据集以查找表示的每个日期,创建我需要的索引,然后一次写入每个索引?我错过了一些明显的东西吗?Logstash 很容易做到这一点,我不明白为什么我不能让它在 Spark 中工作。
这是我正在使用的写入命令(也尝试了它的不同变体):
这是我正在使用的罐子:
这是我使用上面的 write 命令时遇到的错误:
错误 NetworkClient: 节点 [##.##.##.##:9200] 失败(无效的目标 URI HEAD@null/index_name-{ts_col:{YYYY.mm.dd}}/type_name);选择下一个节点 [##.##.##.##:9200]
...
...
Py4JJavaError:调用 o114.save 时出错。:org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException:连接错误(检查网络和/或代理设置)-所有节点都失败;
如果我将覆盖设置为 True,我会得到:
Py4JJavaError:调用 o58.save 时出错。:org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java 的 org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) 没有这样的索引空:446)在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) 在 org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) 在 org.elasticsearch.hadoop.rest。 ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org .elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:
如果我尝试使用 Elasticsearch python 客户端提前创建索引,我会得到:
RequestError: TransportError(400, u'invalid_index_name_exception', u'无效索引名 [index_name-{ts_col:YYYY.MM.dd}], 必须小写')
scala - 从 elasticsearch-spark 检索指标
在 ETL 级联作业结束时,我正在使用 elasticsearch-hadoop 使用 Hadoop 计数器公开的 Hadoop 指标提取有关 Elasticsearch摄取的指标。
我想使用 Spark 做同样的事情,但我没有找到与使用Spark 连接器的指标相关的文档。
并非总是如此,但通常情况下,我们将在 EMR (Hadoop) 中执行作业,因此Spark 连接器可能以与级联连接器相同的方式使用 Hadoop。无论如何,我认为情况并非如此,因为我认为它仅适用于像 Cascading 这样的“MapReduce 连接器类型”。
所以我的问题是:
- 如何从 Elasticsearch Spark 连接器中提取指标?
- 如果连接器使用 Hadoop 计数器,当我在 Hadoop Yarn 中执行它时,如何从 Spark 访问 Hadoop 计数器?
版本:
- 斯卡拉 2.11.8
- 火花 2.1
- Hadoop 2.7.2
- 弹性搜索-火花-20_2.11 5.2.2
scala - java.lang.NoSuchMethodError:scala.reflect.api.JavaUniverse.runtimeMirror
似乎 scala 版本不兼容,但我看到 spark、spark 2.10 和 scala 2.11.8 的文档是可以的。
那是我的 pom.xml,这只是一个测试 spark 用 es-hadoop 写入 elasticsearch,我不知道如何解决这个异常。`
这是我的代码
apache-spark - spark如何读取几列elasticsearch?
在es集群中,数据规模很大,我们使用spark计算数据,但是采用的方式elasticsearch-hadoop
,后面是https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
我们必须读取索引的完整列。有什么可以帮助的吗?