问题标签 [rdd]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 修改 Spark RDD foreach 中的集合
我试图在迭代 RDD 的元素时向地图添加元素。我没有收到任何错误,但没有进行修改。
直接添加或迭代其他集合都可以正常工作:
但是当我尝试从 RDD 做同样的事情时:
我尝试像在 foreach 之前一样打印地图的内容,以确保变量相同,并且打印正确:
我还在 foreach 代码中打印了地图的修改元素,它打印为已修改,但是当操作完成时,地图似乎未修改。
将 RDD 转换为数组(收集)也可以正常工作:
这是上下文问题吗?我是否正在访问正在其他地方修改的数据的副本?
scala - Scala Spark 中 RDD 的嵌套
提到这个问题: Scala Spark 中的 NullPointerException,似乎是由集合类型引起的?
答案指出“Spark 不支持 RDD 的嵌套(请参阅https://stackoverflow.com/a/14130534/590203以了解另一个相同问题的出现),因此您无法对其他 RDD 内的 RDD 执行转换或操作操作。”
这段代码:
印刷 :
这是对的。
但这与“不能在其他 RDD 操作中对 RDD 执行转换或操作”不矛盾吗?因为在 RDD 上发生了嵌套操作?
scala - 如何在 Scala Spark 中对 RDD 进行排序?
阅读 Spark 方法 sortByKey :
是否可以只返回“N”个结果。所以不是返回所有结果,而是返回前 10 个。我可以将排序的集合转换为数组并使用take
方法,但由于这是一个 O(N) 操作,有没有更有效的方法?
scala - 在 Scala Spark 中找不到 reduceByKey 方法
尝试从源代码运行http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala。
这一行:
正在抛出错误
logData.flatMap(line => line.split(" ")).map(word => (word, 1))
返回一个 MappedRDD 但我在http://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.RDD中找不到这种类型
我正在从 Spark 源运行此代码,所以可能是类路径问题?但是所需的依赖项在我的类路径上。
apache-spark - Spark Streaming 未将任务分配到集群上的节点
我有两个节点独立集群用于火花流处理。下面是我的示例代码,它演示了我正在执行的过程。
我的问题是 spark 没有将此状态 RDD 分配给多个节点或没有将任务分配给其他节点并导致响应的高延迟,我的输入负载约为每秒 100,000 个元组。
我已经尝试过以下事情,但没有任何效果
1)spark.locality.wait
到 1 秒
2)减少分配给执行程序进程的内存以检查天气火花分发RDD或任务,但即使它超出了驱动器也在运行的第一个节点(m1)的内存限制。
3) 将 spark.streaming.concurrentJobs 从 1(默认)增加到 3
4) 我检查了流 ui 存储,状态 dstream RDD 大约有 20 个分区,都位于本地节点 m1 上。
如果我运行 SparkPi 100000,那么 spark 能够在几秒钟(30-40)后利用另一个节点,所以我确信我的集群配置很好。
编辑
我注意到的一件事是,即使对于我的 RDD,如果我设置存储级别 MEMORY_AND_DISK_SER_2 然后也在应用程序 ui 存储中显示Memory Serialized 1x Replicated
hadoop - 使用 Spark 多次写入 hadoop 分布式文件系统
我创建了一个 spark 作业,它每天从我的 hdfs 中读取一个文本文件,并从文本文件的每一行中提取唯一键。每个文本文件中大约有 50000 个键。然后通过提取的密钥过滤相同的数据并保存到 hdfs。
我想在我的 hdfs 中创建一个目录,其结构为: hdfs://.../date/key 包含过滤后的数据。问题是写入 hdfs 需要很长时间,因为键太多了。
现在的写法:
有没有办法让它更快?我曾考虑将数据重新分区为提取的密钥数量,但我无法以 hdfs://.../date/key 格式保存。我也尝试过 groupByKey 但我无法保存这些值,因为它们不是 RDD。
任何帮助表示赞赏:)
scala - 如何使用加入的 RDD
假设我有一个名为 1.txt 和 2.txt 的文本文件。1.txt 包含
和 2.txt 包含
所以,我通过他们的键(第一列)加入了两者:
现在,如果我理解正确,我得到
现在,假设我需要总结1.txt第二列的所有值,
我该怎么做呢?
如何在加入的RDD中引用2.txt(即g,i,k)的第二列?
有没有很好的使用 RDD 的教程?我是一个火花(和斯卡拉)新手。
scala - scala.MatchError:火花 RDD 上为空
我对 spark 和 scala 都比较陌生。我试图在 spark 上使用 scala 实现协同过滤。下面是代码
它在最后一行抛出一个scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)如果我将 distinctUsers rdd 收集到一个数组中并执行相同的代码,Thw 代码可以正常工作:
处理 RDD 时我在哪里弄错了?
Spark 版本:1.0.0 Scala 版本:2.10.4
scala - 如何对 RDD 进行分区
我有一个文本文件,其中包含大量由空格分隔的随机浮点值。我正在将此文件加载到 scala 中的 RDD 中。这个RDD是如何分区的?
此外,是否有任何方法可以生成自定义分区,以便所有分区具有相同数量的元素以及每个分区的索引?
在这里,我从 HDFS 加载多个文本文件,进程是我正在调用的函数。我可以使用 mapPartitonsWithIndex 的解决方案以及如何在流程函数中访问该索引吗?Map 对分区进行洗牌。