问题标签 [elasticsearch-spark]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
426 浏览

apache-spark - 使用 Spark 中的结构化流使 ES ForEachWriter 接收器具有幂等性

我遇到的情况与从卡夫卡的 Spark 结构化蒸汽中描述的情况相同 - 从检查点恢复后再次处理的最后一条消息。当我在失败后重新启动我的 spark 作业时,最后一条消息会再次得到处理。答案之一表明接收器必须是幂等的。我不确定我是否理解这一点。

现在我写到 ES sink 并且 3 种方法实现如下:

  1. open 方法返回 true
  2. process 方法做 Http post 到 ES
  3. close 方法关闭连接

我想知道如何使 ES sink 幂等,以及如果数据已经被处理,如何使用 open 方法中的 2 个参数 partitionId 和 version 返回 false。

提前致谢。

0 投票
1 回答
1135 浏览

apache-spark - Spark + Elastic 搜索写入性能问题

看到使用 spark java 对 Elasticsearch 的低写入次数。

以下是配置

ES集群使用13.xlarge机器

火花:

2 节点 EMR 集群

ES 索引在映射中定义了 16 个分片。

运行作业时具有以下配置,

并使用

使用此配置,我尝试加载 100 万个文档(每个文档的大小为 1300 字节),因此每个 ES 节点加载 500 条记录/文档。

并在火花日志中看到每个任务

火花代码

此外,当我查看 ES 集群中的网络内图时,它非常低,并且我看到 EMR 没有通过网络发送大量数据。有没有办法告诉 Spark 发送正确数量的数据以加快写入速度?

或者

是否还有其他我想调整的配置。因为我看到每个 es 实例每秒 500 个文档较低。有人可以指导此设置缺少的内容以提高我的 es 写入性能吗

提前致谢

0 投票
1 回答
2764 浏览

apache-spark - 从spark写入elasticsearch非常慢

我正在处理一个文本文件并将转换后的行从 Spark 应用程序写入弹性搜索,如下所示

这运行速度非常慢,大约需要 8 分钟才能写入 287.9 MB / 1513789 条记录。 在此处输入图像描述

鉴于网络延迟始终存在,我如何调整 spark 和 elasticsearch 设置以使其更快。

我在本地模式下使用 spark,有 16 个内核和 64GB RAM。我的 elasticsearch 集群有 1 个主节点和 3 个数据节点,每个节点有 16 个内核和 64GB。

我正在阅读如下文本文件

……

0 投票
1 回答
1391 浏览

scala - Spark Group By 和 Rank 功能运行速度很慢

我正在编写一个 spark 应用程序,用于在一个时间范围内查找前 n 个访问的 URL。但是这项工作一直在运行,并且需要几个小时才能389451在 ES 中记录一个实例。我想减少这个时间。

我正在阅读火花中的弹性搜索,如下所示

在上面的 DF 中,我正在从 ElasticSearch 中读取和过滤。此外,我正在从 URI 中删除查询参数。

然后我正在分组

然后我正在运行一个窗口函数

然后我正在写 finalDF 到 cassandra

我在 ES 集群中有 4 个数据节点,我的 Spark 机器是 16 核 64GB Ram VM。请帮我找出问题所在。

0 投票
0 回答
193 浏览

apache-spark - 投射错误检测到的架构。Pyspark-Elasticsearch

我正在使用 Pyspark 从 ElasticSearch 索引中读取地理点数据。我正在使用以下命令创建我的 DataFrame。

us_df = spark.read.format('es').option('es.query', us_q).option('es.read.field.as.array.include', 'extra_tags').load('index').select('centroid.lat', 'centroid.lon')

当我打印 DataFrame 的架构时,我得到的架构为

但是当我尝试获取前 10 条记录时

us_df.take(10)

我收到一个错误

我已经看到了跟踪,并且纬度和经度作为字符串从 ES 返回

{"_index":"places","_type":"us_place","_id":"548776421","_score":null,"_source":{"centroid":{"lon":"-87.739957","lat":"41.871084"}}

s 被推断为 double,在读取数据时,它会引发错误。

在创建 DataFrame 时,如何强制将纬度和经度解释为字符串而不是加倍。

Spark 版本 - 2.1.1 ES-Hadoop jar - elasticsearch-spark-20_2.11-5.2.2.jar ES 版本 - 5.2.2

谢谢

0 投票
2 回答
2899 浏览

apache-spark - 弹性搜索无法写入所有条目:可能是es超载了

我有一个应用程序,我在其中读取 csv 文件并进行一些转换,然后将它们从 spark 本身推送到弹性搜索。像这样

我有几个节点,在每个节点中,我运行 5-6 个spark-submit推送到elasticsearch

我经常收到错误

我的 Elasticsearch 集群具有以下统计信息 -

我已经修改了 elasticseach 的以下参数

任何人都可以建议,我可以解决什么来摆脱这些错误?

0 投票
0 回答
1081 浏览

apache-spark - 使用自定义映射 ID 从 spark 写入弹性搜索时出错

我正在尝试使用自定义映射 ID 编写从 spark 到 Elastic 的数据帧。当我这样做时,我收到以下错误。

以下是用于写入 ES 的配置。

我使用的是 5.6 版本的 ES 和 2.2.0 的 Spark。让我知道你们是否对此有任何见解。

谢谢。!

0 投票
2 回答
5971 浏览

elasticsearch - 异常-“网络/Elasticsearch 集群不可访问或针对 WAN/云实例时”

我尝试运行 Spark 应用程序以集成 Hbase 和 ES。我曾尝试在 ES 中创建索引并从 HBase 存储数据,但在连接到 ES 服务器时收到“用户未经授权或访问被拒绝”的问题。

我已与运营团队核对并退回 ES 服务器,尝试运行应用程序并获得附加的异常 - 线程“主”org.elasticsearch.hadoop.EsHadoopIllegalArgumentException 中的异常:无法检测 ES 版本 - 通常如果网络/无法访问 Elasticsearch 集群,或者在 org.elasticsearch 的 org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:327) 中没有正确设置“es.nodes.wan.only”的情况下定位 WAN/Cloud 实例。 spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:103) at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:79) at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark .scala:74) 在 org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.标量:55)

我正在使用 Elasticsearch 6.1.1 v。如果有人遇到此问题并清除异常,请告诉我

0 投票
1 回答
492 浏览

scala - Elasticsearch 火花读取缓慢

v6.2使用规定的 spark 连接器从 Elasticsearch 读取到 sparkorg.elasticsearch:elasticsearch-spark-20_2.11:6.3.2非常慢。这是来自具有索引的 3 节点 ES 集群:

在(10 个节点,1tb 内存,>50 个 VCPU)spark 集群上读取:

执行需要 10 分钟。 在此处输入图像描述

这是(缓慢地)它应该如何工作,还是我做错了什么?

0 投票
1 回答
83 浏览

apache-spark - 如何在 writeStream 到 Elasticsearch 之前将 JSON 数组转换为行?

跟进这个问题

我有格式与以下相同的 JSON 流数据

我需要将其转换为以下格式

为了实现这一点,按照上一个问题的建议进行了转换。

现在我需要将数据保存到 ElasticSearch。

我收到 ElasticSearch 不支持Append输出模式的错误。在模式下,它无法通过聚合Append写入失败,无法在模式下完成。我能够在完整模式下写入控制台。我现在如何将数据写入 ElasticSearchwriteStreamAppend

任何帮助将不胜感激。