问题标签 [spark-dataframe]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2448 浏览

java - JavaPairRDD 到 Apache Spark 中的 DataFrame 与 java

我有一些文档文件,我尝试读取数据,然后使用 zipWithIndex() 函数使用索引压缩如下:

之后 zipIndex 的值是一个包含键值对的 JavaPairRDD,看起来像 ["This is the beautiful picture", 0], ["This is another picture", 1]。

但是现在,我想使用以下方法将 zipIndex 转换为 DataFrame:

函数 createDataFrame 不接受参数 zipIndex(TextId 是具有 2 个属性的类:String text 和 int docId)。

我在 Scala 中也有一个代码,它运行得很好。请参考这个:

如果有什么解决办法。请帮我解决。谢谢

0 投票
3 回答
1698 浏览

scala - 联合两个 RDD Spark scala,保持右侧

我有两个火花数据框,具有以下结构。如使用 sqlContext 之前所读。

我想要基于复合键 (id_location,id_item) 的以下结果

所以,我想要一个具有不同itens的结果(关于复合键),但是当我在两个rdds中找到具有相同键的记录时,我只想保留来自rdd2的记录。

有人有这种要求吗?

我正在使用 spark 和 scala。

最好的问候拉斐尔。

0 投票
1 回答
742 浏览

apache-spark - 为什么 pyspark 会选择未广播的变量?

我正在使用 pyspark 分析数据集,我有点惊讶为什么即使我使用的是广播的变量,以下代码也能正常工作。

有问题的变量是video,在函数中使用filter,在连接之后。

我在独立模式下使用 pyspark,pyspark-submit 有以下参数:

--num-executors 12 --executor-cores 4 --executor-memory 1g --master local[*]

另外,我在jupyter(新的 ipython-notebooks)上运行之前的代码。

0 投票
0 回答
233 浏览

python - 在火花数据框中使用凤凰动态列

Phoenix 能够在为表选择初始模式后添加动态列。您可以在 upserts 期间动态添加列。我的问题是如何有效地使用这些列,尤其是在 spark 数据帧中?我了解连接 spark 数据框 phoenix 时具有架构,并且我知道动态列不在架构中。那么这可能吗?

0 投票
2 回答
5658 浏览

python - 用户定义的函数会破坏 pyspark 数据框

我的 spark 版本是 1.3,我正在使用 pyspark。

我有一个名为 df 的大型数据框。

然后我选择数据框的几列并尝试计算行数。这工作正常。

然后我应用用户定义的函数将其中一列从字符串转换为数字,这也可以正常工作

但是,如果我尝试计算行数,即使该类型显示它是一个数据框,就像 df3 一样,我也会得到一个异常。

我的错误:

我是否正确使用了用户定义的函数?知道为什么数据框功能不适用于数据框吗?

0 投票
1 回答
10012 浏览

unicode - 带有架构的行中的 sqlContext.createDataframe。pyspark:TypeError:IntegerType 不能接受类型中的对象

在花了很多时间弄清楚为什么我收到以下错误之后

在尝试基于行和模式创建数据框时,我注意到以下内容:

在我的 rdd 中有一个名为 rrdRows 的 Row,如下所示:

我的 dfSchema 定义为:

创建一个数据框如下:

带来了上面提到的Error,因为Spark只考虑了StructFields在schema中的顺序,并没有将StructFields的名称与Row字段的名称进行匹配。

换句话说,在上面的示例中,我注意到 spark 尝试创建一个如下所示的数据帧(如果不存在 typeError。例如,如果所有内容都是 String 类型)

这真的是预期的,还是某种错误?

编辑: rddRows 是按照这些思路创建的:

其中 rddDict 是已解析的 JSON 文件。

0 投票
1 回答
5670 浏览

apache-spark - Python API 中是否提供 Spark SQL UDAF(用户定义的聚合函数)?

从 Spark 1.5.0 开始,似乎可以为 DataFrames 上的自定义聚合编写自己的 UDAF: Spark 1.5 DataFrame API 亮点:日期/时间/字符串处理、时间间隔和 UDAF

但是,我不清楚 Python API 是否支持此功能?

0 投票
0 回答
1175 浏览

python - 将二进制数据读入(py)spark DataFrame

我正在将一个二进制文件摄取到 Spark 中——文件结构很简单,它由一系列记录组成,每条记录都包含许多浮点数。目前,我正在 python 中以块的形式读取数据,然后遍历各个记录以将它们转换为RowSpark 可以用来构造DataFrame. 这是非常低效的,因为不是以块的形式处理数据,而是需要我遍历各个元素。

是否有一种明显(首选)的方式来摄取这样的数据?理想情况下,我将能够将文件的一部分(比如 10240 条记录左右)读入缓冲区,指定架构并将其DataFrame直接转换为。我看不到使用当前 API 执行此操作的方法,但也许我遗漏了一些东西?

这是一个演示当前过程的示例笔记本:https ://gist.github.com/rokroskar/bc0b4713214bb9b1e5ed

理想情况下,我将能够摆脱 for 循环buf,直接将整个批次转换为对象read_batches数组。Row

0 投票
1 回答
631 浏览

scala - 在Scala中将来自不同数据帧的行合并在一起

例如首先我有一个这样的数据框

我们有 2012 年、1997 年和 2015 年。我们还有另一个这样的 Dataframe

我们还有 2012 年、1997 年、2015 年。我们如何将同一年的行合并在一起?谢谢

输出应该是这样的

0 投票
1 回答
1440 浏览

scala - Spark DataFrame 上的并发操作

我需要对 DataFrame 和计数进行不同的过滤操作,然后对单个计数进行求和。我使用 Scala Future 进行并发执行。这是代码:

每个过滤器/计数操作的运行时间大约需要 7 秒。但是,运行多次后,并发执行的总时间总是在 35 秒左右,而不是我预期的 7 秒。我对这种行为困惑了很长一段时间,但无法弄清楚。

我有一个由 3 台机器组成的集群,一个主节点,两个工作节点,每个节点有 128G 内存和 32 个内核。数据大小约为3G。我注意到在并发执行期间,一个工作节点有 20 秒的 GC 时间。我已经调整了 GC,使得单个过滤器/计数操作几乎没有 GC 时间。我不确定为什么每当我运行 3 个 Future 的并发执行时 GC 就会启动,以及是否是导致并发执行时间更长的原因。

有人在这个问题上有经验吗?