问题标签 [rdd]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
149909 浏览

performance - Apache Spark:地图与地图分区?

RDD mapmapPartitions方法有什么区别?并且flatMap表现得像map还是像mapPartitions?谢谢。

(编辑)即两者之间有什么区别(在语义上或在执行方面)

和:

0 投票
1 回答
16168 浏览

scala - 修改 Spark RDD foreach 中的集合

我试图在迭代 RDD 的元素时向地图添加元素。我没有收到任何错误,但没有进行修改。

直接添加或迭代其他集合都可以正常工作:

但是当我尝试从 RDD 做同样的事情时:

我尝试像在 foreach 之前一样打印地图的内容,以确保变量相同,并且打印正确:

我还在 foreach 代码中打印了地图的修改元素,它打印为已修改,但是当操作完成时,地图似乎未修改。

将 RDD 转换为数组(收集)也可以正常工作:

这是上下文问题吗?我是否正在访问正在其他地方修改的数据的副本?

0 投票
1 回答
3631 浏览

scala - Scala Spark 中 RDD 的嵌套

提到这个问题: Scala Spark 中的 NullPointerException,似乎是由集合类型引起的?

答案指出“Spark 不支持 RDD 的嵌套(请参阅https://stackoverflow.com/a/14130534/590203以了解另一个相同问题的出现),因此您无法对其他 RDD 内的 RDD 执行转换或操作操作。”

这段代码:

印刷 :

这是对的。

但这与“不能在其他 RDD 操作中对 RDD 执行转换或操作”不矛盾吗?因为在 RDD 上发生了嵌套操作?

0 投票
3 回答
41726 浏览

scala - 如何在 Scala Spark 中对 RDD 进行排序?

阅读 Spark 方法 sortByKey :

是否可以只返回“N”个结果。所以不是返回所有结果,而是返回前 10 个。我可以将排序的集合转换为数组并使用take方法,但由于这是一个 O(N) 操作,有没有更有效的方法?

0 投票
3 回答
13126 浏览

scala - 在 Scala Spark 中找不到 reduceByKey 方法

尝试从源代码运行http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala

这一行:

正在抛出错误

logData.flatMap(line => line.split(" ")).map(word => (word, 1))返回一个 MappedRDD 但我在http://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.RDD中找不到这种类型

我正在从 Spark 源运行此代码,所以可能是类路径问题?但是所需的依赖项在我的类路径上。

0 投票
3 回答
1777 浏览

apache-spark - Spark Streaming 未将任务分配到集群上的节点

我有两个节点独立集群用于火花流处理。下面是我的示例代码,它演示了我正在执行的过程。

我的问题是 spark 没有将此状态 RDD 分配给多个节点或没有将任务分配给其他节点并导致响应的高延迟,我的输入负载约为每秒 100,000 个元组。

我已经尝试过以下事情,但没有任何效果

1)spark.locality.wait到 1 秒

2)减少分配给执行程序进程的内存以检查天气火花分发RDD或任务,但即使它超出了驱动器也在运行的第一个节点(m1)的内存限制。

3) 将 spark.streaming.concurrentJobs 从 1(默认)增加到 3

4) 我检查了流 ui 存储,状态 dstream RDD 大约有 20 个分区,都位于本地节点 m1 上。

如果我运行 SparkPi 100000,那么 spark 能够在几秒钟(30-40)后利用另一个节点,所以我确信我的集群配置很好。

编辑

我注意到的一件事是,即使对于我的 RDD,如果我设置存储级别 MEMORY_AND_DISK_SER_2 然后也在应用程序 ui 存储中显示Memory Serialized 1x Replicated

0 投票
3 回答
5460 浏览

hadoop - 使用 Spark 多次写入 hadoop 分布式文件系统

我创建了一个 spark 作业,它每天从我的 hdfs 中读取一个文本文件,并从文本文件的每一行中提取唯一键。每个文本文件中大约有 50000 个键。然后通过提取的密钥过滤相同的数据并保存到 hdfs。

我想在我的 hdfs 中创建一个目录,其结构为: hdfs://.../date/key 包含过滤后的数据。问题是写入 hdfs 需要长时间,因为键太多了。

现在的写法:

有没有办法让它更快?我曾考虑将数据重新分区为提取的密钥数量,但我无法以 hdfs://.../date/key 格式保存。我也尝试过 groupByKey 但我无法保存这些值,因为它们不是 RDD。

任何帮助表示赞赏:)

0 投票
1 回答
867 浏览

scala - 如何使用加入的 RDD

假设我有一个名为 1.txt 和 2.txt 的文本文件。1.txt 包含

和 2.txt 包含

所以,我通过他们的键(第一列)加入了两者:

现在,如果我理解正确,我得到

现在,假设我需要总结1.txt第二列的所有值,

  1. 我该怎么做呢?

  2. 如何在加入的RDD中引用2.txt(即g,i,k)的第二列?

  3. 有没有很好的使用 RDD 的教程?我是一个火花(和斯卡拉)新手。

0 投票
1 回答
4259 浏览

scala - scala.MatchError:火花 RDD 上为空

我对 spark 和 scala 都比较陌生。我试图在 spark 上使用 scala 实现协同过滤。下面是代码

它在最后一行抛出一个scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)如果我将 distinctUsers rdd 收集到一个数组中并执行相同的代码,Thw 代码可以正常工作:

处理 RDD 时我在哪里弄错了?

Spark 版本:1.0.0 Scala 版本:2.10.4

0 投票
3 回答
5404 浏览

scala - 如何对 RDD 进行分区

我有一个文本文件,其中包含大量由空格分隔的随机浮点值。我正在将此文件加载到 scala 中的 RDD 中。这个RDD是如何分区的?

此外,是否有任何方法可以生成自定义分区,以便所有分区具有相同数量的元素以及每个分区的索引?

在这里,我从 HDFS 加载多个文本文件,进程是我正在调用的函数。我可以使用 mapPartitonsWithIndex 的解决方案以及如何在流程函数中访问该索引吗?Map 对分区进行洗牌。