问题标签 [rdd]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
373 浏览

apache-spark - 大 RDD 与多个小 RDD

历史数据:

  1. 每个活动我有多个表,其中包含一些历史信息,例如 GRP 和 CPP
  2. 我有多个维度,为每个活动定义了 GRP 和 CPP
    维度 - 地理、时间周期、主消息
  3. 每个活动可能包含这些维度的子集

例子

用例:

  1. 有时我想查看所有活动的跨时间段的数据(此维度适用于所有活动)

  2. 有时我想查看跨地区的数据(这个维度存在于一些活动中)

我必须设计 RDD,以便我的所有用例都能有效地工作。

在任何给定的时间点,每项工作都将迎合一个单一的活动。

我有两个选择——

  1. 为每个活动创建一个 RDD,并在该表中跨维度对其进行分区。因此,我将拥有与活动一样多的 RDD。
    对于每项工作,我将访问特定的 RDD 并计算

  2. 为所有活动创建一个 RDD 并在某个维度上对其进行分区
    对于每个作业,我将访问单个大 RDD 并对该 RDD 执行过滤以进行活动并进行计算

我的问题是哪个选项在给定用例和假设的情况下设计 RDD 更有效。

蒂亚!

0 投票
2 回答
46569 浏览

scala - 使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

有没有办法RDD在spark中连接两个不同s的数据集?

要求是 - 我使用具有相同列名的 scala 创建了两个中间 RDD,需要组合两个 RDD 的这些结果并缓存结果以访问 UI。我如何在这里合并数据集?

RDD 的类型spark.sql.SchemaRDD

0 投票
1 回答
2753 浏览

scala - 如何在其他 RDD 映射方法中使用 RDD?

我得到了一个名为 index: RDD[(String, String)] 的 rdd,我想用 index 来处理我的文件。这是代码:

问题是我不能在 file.map 函数中使用索引,我运行了这个程序,它给了我这样的反馈:

我不知道为什么。如果我想实现这个功能,我该怎么办?谢谢

0 投票
4 回答
77785 浏览

scala - 加入两个带有/不带 Spark SQL 的普通 RDD

RDDs我需要在一个/多个列上加入两个普通的。逻辑上这个操作相当于两个表的数据库连接操作。我想知道这是否只能通过Spark SQL或有其他方法来实现。

作为一个具体的例子,考虑r1带有主键的 RDD ITEM_ID

r2和带有主键的RDD COMPANY_ID

我想加入r1r2

如何才能做到这一点?

0 投票
1 回答
1066 浏览

scala - 标准化 RDD

假设我有一个双打 RDD,我想将它“标准化”如下:

  1. 计算每个 col 的均值和 sd
  2. 对于每个 col,从每个条目中减去列平均值,然后将结果除以列 sd

这可以有效且轻松地完成(无需在任何阶段将 RDD 转换为双数组)吗?

谢谢并恭祝安康,

0 投票
1 回答
1329 浏览

java - Spark-无法使用 first() 访问 JavaRDD 中的第一个元素

使用spark及其JavaAPI。我已将数据加载到JavaRDD<CustomizedDataStructure>这样的位置:

当我这样做时:

它向我返回值,表明它确实包含数据,而不是nullRDD。但是在运行时:

它应该返回我 a <CustomizedDataStructure>,但它给出了这样的错误:

为什么不是serializable

0 投票
2 回答
4949 浏览

scala - Spark RDD——它们是如何工作的

我有一个在单节点上运行良好的小型 Scala 程序。但是,我正在扩展它,以便它在多个节点上运行。这是我的第一次这样的尝试。我只是想了解 RDD 在 Spark 中是如何工作的,所以这个问题是基于理论的,可能不是 100% 正确的。

假设我创建了一个 RDD: val rdd = sc.textFile(file)

现在,一旦我这样做了,这是否意味着文件file现在跨节点分区(假设所有节点都可以访问文件路径)?

其次,我想计算 RDD 中的对象数量(足够简单),但是,我需要在需要应用于 RDD 中的对象的计算中使用该数字 - 一个伪代码示例:

假设有 100 个对象rdd,假设有 10 个节点,因此每个节点计数 10 个对象(假设这是 RDD 概念的工作原理),现在当我调用该方法时,每个节点将使用rdd.sizeas执行计算10还是100?因为,总体而言,RDD 是大小100,但在每个节点上本地它只是10. 在进行计算之前,我是否需要制作广播变量?这个问题与下面的问题有关。

最后,如果我对 RDD 进行转换,例如rdd.map(_.split("-")),然后我想要新size的 RDD,我是否需要对 RDD 执行操作,例如count(),以便将所有信息发送回驱动节点?

0 投票
2 回答
5917 浏览

apache-spark - 如何更新 RDD?

我们正在开发 Spark 框架,其中我们将历史数据移动到 RDD 集中。

基本上,RDD 是不可变的,我们在其上进行操作的只读数据集。基于此,我们将历史数据移动到 RDD 中,并在此类 RDD 上进行过滤/映射等计算。

现在有一个用例,其中 RDD 中的数据子集被更新,我们必须重新计算这些值。

HistoricalData 采用 RDD 的形式。我根据请求范围创建另一个 RDD,并将该 RDD 的引用保存在ScopeCollection中

到目前为止,我已经能够想到以下方法 -

方法1:广播变化:

  1. 对于每个更改请求,我的服务器都会获取特定于范围的 RDD 并生成一个作业
  2. 在作业中,在该 RDD 上应用映射阶段 -

    2.a。对于 RDD 中的每个节点,在广播中查找并创建一个现在已更新的新值,从而创建一个新的 RDD
    2.b。现在我在 step2.a 再次对这个新的 RDD 进行所有计算。像乘法,减少等
    2.c。我将此 RDD 引用保存回我的ScopeCollection

方法2:为更新创建一个RDD

  1. 对于每个更改请求,我的服务器都会获取特定于范围的 RDD 并生成一个作业
  2. 在每个 RDD 上,与具有更改的新 RDD 进行联接
  3. 现在我在第 2 步再次对这个新的 RDD 进行所有计算,例如乘法、减法等

方法3:

我曾想过创建流式 RDD,在其中不断更新相同的 RDD 并进行重新计算。但据我了解,它可以从 Flume 或 Kafka 获取流。而在我的情况下,这些值是根据用户交互在应用程序本身中生成的。因此,在我的上下文中,我看不到任何流式 RDD 的集成点。

关于哪种方法更好或适合这种情况的任何其他方法的任何建议。

蒂亚!

0 投票
2 回答
4085 浏览

scala - 可以使用 reduceBykey 来更改类型和组合值 - Scala Spark?

在下面的代码中,我试图组合值:

reduceByValue应该包含 (a , 1,3,2) 但收到编译时错误:

什么决定了reduce函数的类型?不能转换类型吗?

我可以groupByKey用来达到相同的结果,但只是想了解reduceByKey

0 投票
1 回答
1948 浏览

java - Spark 在有 2 个工作人员的集群上的 JdbcRDD 中引发 NullPointerException

我正在运行带有 2 个 Worker 的 spark 集群,每个 60GB。

我为 JdbcRDD 编写了以下代码。

GetJdbcResult 的代码是

}

上面的代码运行良好,我在独立模式(本地 *)下运行 Spark,但如果使用集群环境,则会抛出以下错误:

驱动程序堆栈跟踪:

Worker 日志上没有任何跟踪/日志。我在这里做错了吗?有人知道吗?