问题标签 [elasticsearch-hadoop]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1171 浏览

apache-spark - rdd.saveAsNewAPIHadoopFile 的 Spark 驱动程序内存和解决方法

我在使用特定的 spark 方法时遇到问题,saveAsNewAPIHadoopFile. 上下文是我正在使用 pyspark,将具有 1k、10k、50k、500k、1m 记录的 RDD 索引到 ElasticSearch (ES) 中。

由于各种原因,Spark 上下文在使用 2gb 驱动程序和单个 2gb 执行程序时的性能非常低下。

直到大约 500k,当我遇到 java 堆大小问题时,我都没有问题。增加到大约4gb spark.driver.memory,我可以索引更多。但是,它的工作时间是有限制的,我们希望索引超过 500k、1m、5m、20m 的记录。

由于各种原因,也受限于使用 pyspark。瓶颈和断点似乎是一个名为 的火花阶段take at SerDeUtil.scala:233,无论 RDD 进入多少个分区,它都会下降到一个,我假设这是驱动程序收集分区并准备索引。

现在 - 我想知道是否有一种有效的方法仍然使用如下方法,考虑到该约束:

为了寻求好的解决方案,我还不如晾一些脏衣服。我有一个非常低效的解决方法,用于zipWithIndex分块 RDD,并将这些子集发送到上面的索引函数。看起来有点像这样:

如果我理解正确的话,它会很慢,因为zipWithIndex,filtermap会针对每个生成的 RDD 子集进行评估。 但是,它的内存效率也很高,因为 500k、1m、5m 等记录永远不会被发送到saveAsNewAPIHadoopFile这些较小的 RDD,相对较小的火花驱动器可以处理。

对于不同方法的任何建议将不胜感激。也许这意味着现在使用Elasticsearch-Hadoop连接器,而是将原始 JSON 发送到 ES?

更新:

看起来我仍然在使用此解决方法时遇到 java 堆空间错误,但离开这里是为了演示对可能解决方法的思考。没想到zipWithIndex会收集驱动程序上的所有内容(我假设这里就是这种情况)

更新#2

这是我尝试运行的 RDD 的调试字符串saveAsNewAPIHadoopFile

更新#3

下面是take at SerDeUtil.scala:233似乎运行的 DAG 可视化driver/localhost

在此处输入图像描述

还有一个 DAG 用于saveAsNewAPIHadoopFile更小的工作(大约 1k 行),因为 500k 行的尝试从未真正触发,因为SerDeUtil上面的阶段似乎触发了较大 RDD 的 java 堆大小问题:

在此处输入图像描述

0 投票
5 回答
5653 浏览

scala - Elasticsearch + Spark:使用自定义文档_id编写json

我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:

  1. 文档已经用 JSON 序列化,应该按原样编写
  2. _id应提供Elasticsearch 文档

这是我到目前为止所尝试的。

saveJsonToEs()

我尝试这样使用saveJsonToEs()(序列化文档包含_id具有所需 Elasticsearch ID 的字段):

但是elasticsearch-hadoop图书馆给出了这个例外:

如果我删除es.mapping.exclude但保留es.mapping.id并发送带有_id内部的 JSON(如{"_id":"blah",...}

我收到此错误:

当我尝试将此 id 作为不同的字段发送时(例如{"superID":"blah",..."

它无法提取该字段:

当我从配置中删除es.mapping.ides.mapping.exclude从配置中删除时,它可以工作,但文档 ID 是由 Elasticsearch 生成的(这违反了要求 2):

saveToEsWithMeta()

还有另一个功能要提供_id和其他用于插入的元数据saveToEsWithMeta():允许解决需求 2 但因需求 1 而失败。

事实上,Elasticsearch 甚至无法解析elasticsearch-hadoop发送的内容:

问题

是否可以将 Spark 的集合写入(documentID, serializedDocument)Elasticsearch(使用elasticsearch-hadoop)?

PS 我正在使用 Elasticsearch 5.6.3 和 Spark 2.1.1。

0 投票
1 回答
518 浏览

apache-spark - 如何在启用 X-Pack 的情况下设置 Elasticsearch 结构化流?

我正在尝试使用安装了 x-pack 的 Elasticsearch (ES) 6.1.1 Hadoop 来使用 Spark Structured Streaming 2.2.1 写入数据。这是我的代码(索引已经存在于弹性中):

但我确实得到了以下异常:

如何设置所需的身份验证数据?

0 投票
0 回答
396 浏览

elasticsearch - 从 Spark 写入 Elasticsearch,时间戳错误

我有一个单列 Spark 数据框:

使用 spark 写入 elasticsearch 时,updateDate 字段不会被视为日期,而是写入 unix 时间戳 (ms)。

这是摄取的项目:

如果我将 Spark 数据帧写入文件,则日期字段写为: 2017-10-27T00:00:00.000Z

什么可能导致这种行为?

0 投票
0 回答
279 浏览

hadoop - Elasticsearch 拯救了 ES-HADOOP PLUGIN

我们正在使用 ES-HADOOP 插件将数据从 Hadoop HBASE 表推送到 Elasticsearch 集群。以下是集群详细信息。

  • 弹性搜索版本:2.3.5
  • 数据节点:3
  • 主节点:3
  • 客户端节点:1

数据节点也是主节点。

  • 数据/主节点堆:20GB
  • 客户端节点堆:3GB
  • 每个索引的主分片数:5
  • 每个索引的副本分片数:1

当我们在 Spark 上执行作业以及在一段时间后将数据从 Hadoop 推送到 Elasticsearch 的阶段时,我们开始获取ElasticSearch Bailing Out.

我们怀疑 Elasticsearch 可以为 Bulk API 处理的并发连接数超过了 Spark 执行器,因为发布的最大连接数 Elasticsearch 开始拒绝写入请求。

我们如何确定 ElasticSearch 客户端节点可以处理多少并发批量 API 连接并成功写入数据,以及每个批量 API 请求的最大文档数应该是多少?

对于需要在一小时内索引 80-90 GB 数据的写入操作,我们应该研究哪些参数来优化 ElasticSearch 集群?

0 投票
0 回答
138 浏览

python-2.7 - Elasticsearch-Hadoop 格式化多资源写入问题

我正在使用 Elasticsearch-Hadoop 插件将 Elasticsearch 与 Spark 连接,并且我很难将具有timestamp类型列的数据框写入 Elasticsearch。

问题是当我尝试使用动态/多资源格式来创建每日索引时。

相关文档中我得到的印象是这是可能的,但是,除非我将数据帧类型更改为date.

当我尝试写作时,我使用以下内容:

不幸的是,这会导致错误:

.... 引起:java.lang.IllegalArgumentException:无效格式:“2018-03-04 12:36:12.949897”在 org.joda.time.format.DateTimeFormatter.parseDateTime 的“12:36:12.949897”处格式错误(DateTimeFormatter.java:945)

另一方面,如果我在创建数据框时使用日期,则效果很好:

是否可以使用此方法格式化timestamp要写入每日索引的数据框,而无需保留一date列?月度指数如何?

Pyspark 版本:spark 版本 2.2.1 使用 Scala 版本 2.11.8,OpenJDK 64-Bit Server VM,1.8.0_151

ElasticSearch 版本号 "6.2.2" build_hash "10b1edd" build_date "2018-02-16T19:01:30.685723Z" build_snapshot false lucene_version "7.2.1" minimum_wire_compatibility_version "5.6.0" minimum_index_compatibility_version "5.0.0"

0 投票
1 回答
1030 浏览

apache-spark - Spark 写入 Elasticsearch 性能缓慢

我似乎遇到了一个问题,即 Spark 写入 Elasticsearch 的速度非常慢,并且在建立初始连接时需要很长时间(大约 15 分钟),在此期间 Spark 和 Elasticsearch 都保持空闲状态。在弹性社区中还有另一个线程强调了相同的问题,但它已被关闭,没有任何解决方案。

这就是我从 Spark 写入 ES 的方式:

vgDF.write.format("org.elasticsearch.spark.sql").mode('append').option("es.resource", "demoindex/type1").option("es.nodes", "*ES IP*").save()

火花规格

Spark 2.1.0 3 cpu x 10 gb ram x 6 executors running on 3 gce nodesSpark 2.1.0

弹性搜索规范:

8 cpu * 30 gb RAM single node

ES 版本:

Elasticsearch: 6.2.2 ES-Hadoop: 6.2.2

供您参考,Spark 从 Cassandra DB 读取数据,处理结果(但这个过程非常快,大约需要 1 到 2 分钟),然后写入 Elasticsearch。

任何帮助将不胜感激

[编辑]

我还尝试将数据大小从数百万条记录更改为仅 960 条记录,但初始延迟仍然相同(大约 15 分钟)。

0 投票
1 回答
4900 浏览

nested - Pypsark - 使用 collect_list 时保留空值

根据pyspark collect_set 或 collect_list with groupby接受的答案,当您对某个列执行 a 时,该列中的值将被删除。我已经检查过了,这是真的。collect_listnull

但就我而言,我需要保留空列——我怎样才能做到这一点?

我没有找到关于这种collect_list函数变体的任何信息。


背景上下文来解释为什么我想要空值:

我有一个数据框df如下:

我想将其写入具有以下映射的 Elasticsearch 索引:

为了符合上面的嵌套映射,我转换了我的 df,以便对于 eId 和 cId 的每个组合,我都有一个这样的事务数组:

保存df_nested为 json 文件,有我得到的 json 记录:

如您所见 - whencId=1eId=3,我的数组元素之一amount=30.0没有该city属性,因为这是null我的原始数据(df)中的 a 。当我使用该collect_list功能时,将删除空值。

但是,当我尝试使用上述索引将 df_nested 写入 elasticsearch 时,它会出错,因为存在架构不匹配。这基本上就是为什么我想在应用该collect_list函数后保留我的空值的原因。


0 投票
0 回答
47 浏览

apache-spark - Spark如何编写压缩的镶木地板文件?

使用 Apache Spark 1.6.4 和 elasticsearch4hadoop 插件,我将一个 elasticsearch 索引(100m 个文档,100Go,5 个碎片)导出到 HDFS 2.7 中的 gzipped parquet 文件中。

我将此 ETL 作为 Java 程序运行,具有 1 个执行程序(8 个 CPU,12Go RAM)。

5 个任务的过程(因为 5 个 ES 分片)大约需要 1 个小时,大部分时间都可以正常工作,但有时我会看到一些 Spark 任务失败,因为out of memory error.

在此过程中,我可以在 HDFS 中看到一些临时文件,但它们的大小始终为 0。

问:我想知道 Spark 在写入 gz.parquet 文件之前是否将数据保存在内存中?

0 投票
1 回答
712 浏览

scala - Spark Scala - 如何从嵌套的 JSON 构造 Scala Map?

我有一个嵌套的 json 数据,其中包含要提取和构造 Scala 映射的嵌套字段。

下面是示例 JSON:

我想使用saveToES()并构造一个 Scala Map 将字段索引到 ES 索引中,映射如下:

使用 spark.read.json("example.json") 将 json 文件读入数据帧。在这种情况下,构建 Scala Map 的正确方法是什么?

谢谢你的帮助!