问题标签 [elasticsearch-hadoop]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - rdd.saveAsNewAPIHadoopFile 的 Spark 驱动程序内存和解决方法
我在使用特定的 spark 方法时遇到问题,saveAsNewAPIHadoopFile
. 上下文是我正在使用 pyspark,将具有 1k、10k、50k、500k、1m 记录的 RDD 索引到 ElasticSearch (ES) 中。
由于各种原因,Spark 上下文在使用 2gb 驱动程序和单个 2gb 执行程序时的性能非常低下。
直到大约 500k,当我遇到 java 堆大小问题时,我都没有问题。增加到大约4gb spark.driver.memory
,我可以索引更多。但是,它的工作时间是有限制的,我们希望索引超过 500k、1m、5m、20m 的记录。
由于各种原因,也受限于使用 pyspark。瓶颈和断点似乎是一个名为 的火花阶段take at SerDeUtil.scala:233
,无论 RDD 进入多少个分区,它都会下降到一个,我假设这是驱动程序收集分区并准备索引。
现在 - 我想知道是否有一种有效的方法仍然使用如下方法,考虑到该约束:
为了寻求好的解决方案,我还不如晾一些脏衣服。我有一个非常低效的解决方法,用于zipWithIndex
分块 RDD,并将这些子集发送到上面的索引函数。看起来有点像这样:
如果我理解正确的话,它会很慢,因为zipWithIndex
,filter
和map
会针对每个生成的 RDD 子集进行评估。 但是,它的内存效率也很高,因为 500k、1m、5m 等记录永远不会被发送到saveAsNewAPIHadoopFile
这些较小的 RDD,相对较小的火花驱动器可以处理。
对于不同方法的任何建议将不胜感激。也许这意味着现在使用Elasticsearch-Hadoop
连接器,而是将原始 JSON 发送到 ES?
更新:
看起来我仍然在使用此解决方法时遇到 java 堆空间错误,但离开这里是为了演示对可能解决方法的思考。没想到zipWithIndex
会收集驱动程序上的所有内容(我假设这里就是这种情况)
更新#2
这是我尝试运行的 RDD 的调试字符串saveAsNewAPIHadoopFile
:
更新#3
下面是take at SerDeUtil.scala:233
似乎运行的 DAG 可视化driver/localhost
:
还有一个 DAG 用于saveAsNewAPIHadoopFile
更小的工作(大约 1k 行),因为 500k 行的尝试从未真正触发,因为SerDeUtil
上面的阶段似乎触发了较大 RDD 的 java 堆大小问题:
scala - Elasticsearch + Spark:使用自定义文档_id编写json
我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:
- 文档已经用 JSON 序列化,应该按原样编写
_id
应提供Elasticsearch 文档
这是我到目前为止所尝试的。
saveJsonToEs()
我尝试这样使用saveJsonToEs()
(序列化文档包含_id
具有所需 Elasticsearch ID 的字段):
但是elasticsearch-hadoop
图书馆给出了这个例外:
如果我删除es.mapping.exclude
但保留es.mapping.id
并发送带有_id
内部的 JSON(如{"_id":"blah",...}
)
我收到此错误:
当我尝试将此 id 作为不同的字段发送时(例如{"superID":"blah",..."
:
它无法提取该字段:
当我从配置中删除es.mapping.id
并es.mapping.exclude
从配置中删除时,它可以工作,但文档 ID 是由 Elasticsearch 生成的(这违反了要求 2):
saveToEsWithMeta()
还有另一个功能要提供_id
和其他用于插入的元数据saveToEsWithMeta()
:允许解决需求 2 但因需求 1 而失败。
事实上,Elasticsearch 甚至无法解析elasticsearch-hadoop
发送的内容:
问题
是否可以将 Spark 的集合写入(documentID, serializedDocument)
Elasticsearch(使用elasticsearch-hadoop
)?
PS 我正在使用 Elasticsearch 5.6.3 和 Spark 2.1.1。
apache-spark - 如何在启用 X-Pack 的情况下设置 Elasticsearch 结构化流?
我正在尝试使用安装了 x-pack 的 Elasticsearch (ES) 6.1.1 Hadoop 来使用 Spark Structured Streaming 2.2.1 写入数据。这是我的代码(索引已经存在于弹性中):
但我确实得到了以下异常:
如何设置所需的身份验证数据?
elasticsearch - 从 Spark 写入 Elasticsearch,时间戳错误
我有一个单列 Spark 数据框:
使用 spark 写入 elasticsearch 时,updateDate 字段不会被视为日期,而是写入 unix 时间戳 (ms)。
这是摄取的项目:
如果我将 Spark 数据帧写入文件,则日期字段写为: 2017-10-27T00:00:00.000Z。
什么可能导致这种行为?
hadoop - Elasticsearch 拯救了 ES-HADOOP PLUGIN
我们正在使用 ES-HADOOP 插件将数据从 Hadoop HBASE 表推送到 Elasticsearch 集群。以下是集群详细信息。
- 弹性搜索版本:2.3.5
- 数据节点:3
- 主节点:3
- 客户端节点:1
数据节点也是主节点。
- 数据/主节点堆:20GB
- 客户端节点堆:3GB
- 每个索引的主分片数:5
- 每个索引的副本分片数:1
当我们在 Spark 上执行作业以及在一段时间后将数据从 Hadoop 推送到 Elasticsearch 的阶段时,我们开始获取ElasticSearch Bailing Out
.
我们怀疑 Elasticsearch 可以为 Bulk API 处理的并发连接数超过了 Spark 执行器,因为发布的最大连接数 Elasticsearch 开始拒绝写入请求。
我们如何确定 ElasticSearch 客户端节点可以处理多少并发批量 API 连接并成功写入数据,以及每个批量 API 请求的最大文档数应该是多少?
对于需要在一小时内索引 80-90 GB 数据的写入操作,我们应该研究哪些参数来优化 ElasticSearch 集群?
python-2.7 - Elasticsearch-Hadoop 格式化多资源写入问题
我正在使用 Elasticsearch-Hadoop 插件将 Elasticsearch 与 Spark 连接,并且我很难将具有timestamp
类型列的数据框写入 Elasticsearch。
问题是当我尝试使用动态/多资源格式来创建每日索引时。
从相关文档中我得到的印象是这是可能的,但是,除非我将数据帧类型更改为date
.
当我尝试写作时,我使用以下内容:
不幸的是,这会导致错误:
.... 引起:java.lang.IllegalArgumentException:无效格式:“2018-03-04 12:36:12.949897”在 org.joda.time.format.DateTimeFormatter.parseDateTime 的“12:36:12.949897”处格式错误(DateTimeFormatter.java:945)
另一方面,如果我在创建数据框时使用日期,则效果很好:
是否可以使用此方法格式化timestamp
要写入每日索引的数据框,而无需保留一date
列?月度指数如何?
Pyspark 版本:spark 版本 2.2.1 使用 Scala 版本 2.11.8,OpenJDK 64-Bit Server VM,1.8.0_151
ElasticSearch 版本号 "6.2.2" build_hash "10b1edd" build_date "2018-02-16T19:01:30.685723Z" build_snapshot false lucene_version "7.2.1" minimum_wire_compatibility_version "5.6.0" minimum_index_compatibility_version "5.0.0"
apache-spark - Spark 写入 Elasticsearch 性能缓慢
我似乎遇到了一个问题,即 Spark 写入 Elasticsearch 的速度非常慢,并且在建立初始连接时需要很长时间(大约 15 分钟),在此期间 Spark 和 Elasticsearch 都保持空闲状态。在弹性社区中还有另一个线程强调了相同的问题,但它已被关闭,没有任何解决方案。
这就是我从 Spark 写入 ES 的方式:
vgDF.write.format("org.elasticsearch.spark.sql").mode('append').option("es.resource", "demoindex/type1").option("es.nodes", "*ES IP*").save()
火花规格
Spark 2.1.0
3 cpu x 10 gb ram x 6 executors
running on 3 gce nodesSpark 2.1.0
弹性搜索规范:
8 cpu * 30 gb RAM single node
ES 版本:
Elasticsearch: 6.2.2
ES-Hadoop: 6.2.2
供您参考,Spark 从 Cassandra DB 读取数据,处理结果(但这个过程非常快,大约需要 1 到 2 分钟),然后写入 Elasticsearch。
任何帮助将不胜感激
[编辑]
我还尝试将数据大小从数百万条记录更改为仅 960 条记录,但初始延迟仍然相同(大约 15 分钟)。
nested - Pypsark - 使用 collect_list 时保留空值
根据pyspark collect_set 或 collect_list with groupby中接受的答案,当您对某个列执行 a 时,该列中的值将被删除。我已经检查过了,这是真的。collect_list
null
但就我而言,我需要保留空列——我怎样才能做到这一点?
我没有找到关于这种collect_list
函数变体的任何信息。
背景上下文来解释为什么我想要空值:
我有一个数据框df
如下:
我想将其写入具有以下映射的 Elasticsearch 索引:
为了符合上面的嵌套映射,我转换了我的 df,以便对于 eId 和 cId 的每个组合,我都有一个这样的事务数组:
保存df_nested
为 json 文件,有我得到的 json 记录:
如您所见 - whencId=1
和eId=3
,我的数组元素之一amount=30.0
没有该city
属性,因为这是null
我的原始数据(df
)中的 a 。当我使用该collect_list
功能时,将删除空值。
但是,当我尝试使用上述索引将 df_nested 写入 elasticsearch 时,它会出错,因为存在架构不匹配。这基本上就是为什么我想在应用该collect_list
函数后保留我的空值的原因。
apache-spark - Spark如何编写压缩的镶木地板文件?
使用 Apache Spark 1.6.4 和 elasticsearch4hadoop 插件,我将一个 elasticsearch 索引(100m 个文档,100Go,5 个碎片)导出到 HDFS 2.7 中的 gzipped parquet 文件中。
我将此 ETL 作为 Java 程序运行,具有 1 个执行程序(8 个 CPU,12Go RAM)。
5 个任务的过程(因为 5 个 ES 分片)大约需要 1 个小时,大部分时间都可以正常工作,但有时我会看到一些 Spark 任务失败,因为out of memory error
.
在此过程中,我可以在 HDFS 中看到一些临时文件,但它们的大小始终为 0。
问:我想知道 Spark 在写入 gz.parquet 文件之前是否将数据保存在内存中?
scala - Spark Scala - 如何从嵌套的 JSON 构造 Scala Map?
我有一个嵌套的 json 数据,其中包含要提取和构造 Scala 映射的嵌套字段。
下面是示例 JSON:
我想使用saveToES()并构造一个 Scala Map 将字段索引到 ES 索引中,映射如下:
使用 spark.read.json("example.json") 将 json 文件读入数据帧。在这种情况下,构建 Scala Map 的正确方法是什么?
谢谢你的帮助!