问题标签 [elasticsearch-hadoop]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1093 浏览

apache-spark - pyspark,弹性搜索输入,无法显示数据框

df.head()加载弹性搜索数据后我可以做得很好。但是在我做之后withColumn,我不能做df.headdf.show()

我不知道发生了什么,withColumn如果我创建相同的代码df2 = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))但不能使用来自 elasticsearch 的数据,则相同的代码可以正常工作。

我正在使用来自https://www.elastic.co/guide/en/kibana/current/tutorial-load-dataset.html的官方演示数据(accounts.zip)

错误消息是

编辑

这是 df.show() 和 df.dtypes

0 投票
1 回答
828 浏览

python - 如何在不查询每个节点的情况下使用 PySpark 对 Elasticsearch 运行查询?

我的最终目标是使用 PySpark 有效地索引 Elasticsearch (ES) 中的大量数据,然后针对索引运行大量查询并记录结果的统计信息。

考虑以下代码:

我只是想对索引运行 match all 查询,我只想要最高的结果。我尝试在 ES 查询中表达限制,但显然 Spark 忽略了这一点,所以我用数据框过滤器来表达它。

我已将 Spark 配置如下:

我正在访问 VPC 后面的 ES 集群,因此我只能访问客户端节点,而不能访问内部数据等节点。这就是wan.only设置为 true 的原因。

通过这种设置,Spark 似乎会以完全匹配的方式查询每个节点,然后最终合并为我真正想要的单个结果。它非常慢(50 个分片,3000 万个文档),它完全避免了 ES 有效减少每个节点本身的结果的能力。即使我将查询更改为通过单个文档 ID 专门搜索,它也会通过主节点通过在每次调用中指定特定的分片 ID 来针对每个单独的分片运行查询。我尝试将 设置es.nodes.client.only为 true,但这抱怨设置与wan.only. 如果启用client.only和禁用,wan.only我将无法再连接到集群,因为它会尝试直接连接无法访问的每个节点。

我在这里做错了什么?如何使用 PySpark 对 ES 运行一次查询,而不是为每个分片运行一次。此外,如果 PySpark 尝试在每个分片上运行完整查询然后看似 post 处理结果,我该如何使用from,size和我的查询之类的东西?rescore

0 投票
1 回答
337 浏览

apache-spark - Elasticsearch Spark,如何多次查询?

我在 jupyter 笔记本上。

我想使用查询 dsl 来准备初始数据框。

我用conf.set("es.query", dsl_query)它。(https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#_querying

但是,我想应用不同的查询来准备另一个数据框,我找不到一种方法来应用新的 dsl_query 而不创建新的SparkContext

但我也没有找到重新创建SparkContext内部 jupyter 环境的方法。

我想使用 QueryDSL-1 作为基线运行分析,然后使用 QueryDSL-2 作为另一个基线运行另一个分析

有没有办法在不创建两个笔记本的情况下做到这一点?

0 投票
1 回答
315 浏览

elasticsearch - 从 HDFS 到 ES 的数据加载需要很长时间

我在 hive 中创建了一个外部表,需要将数据移动到 ES(2 个节点,每个节点 1 TB)。对于具有 9GB 数据的源表,以下常规查询需要很长时间(超过 6 小时)。

ES 索引具有默认的 5 个分片和 1 个副本。增加分片的数量可以加快摄取速度吗?有人可以提出任何改进以加快 ES 节点摄取。

0 投票
2 回答
3843 浏览

apache-spark - load() 在 spark 中做了什么?

spark很懒吧?那么做load()什么呢?

如果show()是唯一的动作,我想load不会像 1 秒那样花费太多时间。所以我的结论load()是一个动作(而不是火花的转变)

load 是否实际上将整个数据加载到内存中?我不这么认为,但它有什么作用呢?

我已经搜索并查看了文档https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html但它没有帮助..

0 投票
1 回答
320 浏览

python - 为不同的查询创建新的 SparkSession?

我想从 elasticsearch 获取两个数据

一个用查询过滤,另一个没有过滤器。

问题是会话是否被重用..

当我filtered先运行查询,然后再non-filtered查询时,两者都给出空结果

但是当我non-filtered第一次运行查询时,它显示了一些结果,随后的filtered查询显示了空结果。

** 编辑

因此,我可以通过以下方式获得所需的结果:

0 投票
1 回答
111 浏览

spark-structured-streaming - 如何使用 spark 结构化流在 elasticsearch sink 中设置动态文档 ID

在弹性搜索写入接收器中,我应该如何添加带有来自数据集字段的动态值的文档 ID。在我的情况下,我需要根据格式化数据集中的特定字段设置文档 ID。遇到“es.mapping.id”,但我将如何从我的数据集中获取值?

0 投票
1 回答
64 浏览

apache-spark - 如何确保在 Spark 流中使用 Elasticsearch-Hadoop 连接器写入 Elasticsearch 集成的所有文档

我正在使用 Elasticsearch-Hadoop 连接器将 DStream 写入 Elasticsearch。这是您可以找到连接器的链接 https://www.elastic.co/guide/en/elasticsearch/hadoop/5.6/spark.html

我需要处理窗口,使用“JavaEsSpark.saveToEs”方法将所有文档写入 ES,并希望确保所有文档写入并将偏移量提交到 Kafka。由于 JavaEsSpark.saveToEs 以批处理模式插入文档,因此我无法跟踪我的文档。

我的基本代码如下。有意见吗?

0 投票
1 回答
69 浏览

amazon-web-services - Databricks 工作人员和 Elasticsearch 节点是否需要位于 AWS 的同一 VPC 中?

我想从 Databricks 中将数据框写入 Elasticsearch。

我的 Elasticsearch 集群托管在 AWS 上,Databricks 正在启动具有特定角色的 EC2 实例。该角色有权与我的 Elasticsearch 集群进行交互,但由于某种原因,我似乎甚至无法 PING Elasticsearch 集群。

尝试 PING 我的集群失败

我是否需要想办法将我的 Databricks 工作人员和我的 Elasticsearch 集群压缩到同一个 VPC 中?听起来像是 CloudFormation 的噩梦。

0 投票
1 回答
684 浏览

apache-spark - Spark 2.4 到 Elasticsearch:防止 Dataproc 节点退役期间的数据丢失?

我的技术任务是将数据从 GCS(谷歌云存储)同步到我们的 Elasticsearch 集群。

我们在 Google Dataproc 集群上使用 Apache Spark 2.4 和 Elastic Hadoop 连接器(启用自动缩放)。

在执行过程中,如果 Dataproc 集群规模缩小,则停用节点上的所有任务都会丢失,并且该节点上处理过的数据永远不会推送到弹性。

例如,当我保存到 GCS 或 HDFS 时,不存在此问题。

即使节点退役,如何使这项任务具有弹性?

堆栈跟踪的摘录:

第 2.3 阶段丢失任务 50.0(TID 427,xxxxxxx-sw-vrb7.c.xxxxxxx,执行程序 43):FetchFailed(BlockManagerId(30,xxxxxxx-w-23.c.xxxxxxx,7337,无),shuffleId=0,mapId =26, reduceId=170, message=org.apache.spark.shuffle.FetchFailedException: 无法连接到 xxxxxxx-w-23.c.xxxxxxx:7337

引起:java.net.UnknownHostException: xxxxxxx-w-23.c.xxxxxxx

阶段 2.3 (TID 427) 中的任务 50.0 失败,但不会重新执行该任务(可能是因为任务因 shuffle 数据获取失败而失败,因此需要重新运行上一个阶段,或者因为不同的副本任务已经成功)。

谢谢。弗雷德