1

我正在处理一个文本文件并将转换后的行从 Spark 应用程序写入弹性搜索,如下所示

input.write.format("org.elasticsearch.spark.sql")
      .mode(SaveMode.Append)
      .option("es.resource", "{date}/" + dir).save()

这运行速度非常慢,大约需要 8 分钟才能写入 287.9 MB / 1513789 条记录。 在此处输入图像描述

鉴于网络延迟始终存在,我如何调整 spark 和 elasticsearch 设置以使其更快。

我在本地模式下使用 spark,有 16 个内核和 64GB RAM。我的 elasticsearch 集群有 1 个主节点和 3 个数据节点,每个节点有 16 个内核和 64GB。

我正在阅读如下文本文件

 val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "inferSchema" -> "false",
  "header" -> "false",
  "delimiter" -> "\t",
  "comment" -> "#",
  "mode" -> "PERMISSIVE")

……

val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
4

1 回答 1

6

首先,让我们从您的应用程序中发生的事情开始。Apache Spark 正在读取 1 个(不是那么大)csv压缩的文件。因此,第一个 spark 将花时间解压缩数据并在将其写入之前对其进行扫描elasticsearch

这将创建一个带有一个分区Dataset的/ (由您在评论中提到的结果确认)。DataFrame df.rdd.getNumPartitions

一种直接的解决方案是在repartition将数据写入elasticsearch. 现在我不确定你的数据是什么样的,所以决定分区的数量是你这边基准的主题。

val input = sqlContext.read.options(readOptions)
                      .csv(inputFile.getAbsolutePath)
                      .repartition(100) // 100 is just an example
                      .cache

我不确定您的应用程序有多少好处,因为我相信可能存在其他瓶颈(网络 IO、ES 的磁盘类型)。

PS:我应该在构建 ETL 之前将 csv 转换为 parquet 文件。这里有真正的性能增益。(个人意见和基准)

另一个可能的优化是调整es.batch.size.entrieselasticsearch-spark 连接器的设置。默认值为1000

设置此参数时需要小心,因为您可能会使 elasticsearch 过载。我强烈建议您查看此处的可用配置。

我希望这有帮助 !

于 2017-11-24T10:53:52.323 回答