问题标签 [spark-dataframe]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - JavaPairRDD 到 Apache Spark 中的 DataFrame 与 java
我有一些文档文件,我尝试读取数据,然后使用 zipWithIndex() 函数使用索引压缩如下:
之后 zipIndex 的值是一个包含键值对的 JavaPairRDD,看起来像 ["This is the beautiful picture", 0], ["This is another picture", 1]。
但是现在,我想使用以下方法将 zipIndex 转换为 DataFrame:
函数 createDataFrame 不接受参数 zipIndex(TextId 是具有 2 个属性的类:String text 和 int docId)。
我在 Scala 中也有一个代码,它运行得很好。请参考这个:
如果有什么解决办法。请帮我解决。谢谢
scala - 联合两个 RDD Spark scala,保持右侧
我有两个火花数据框,具有以下结构。如使用 sqlContext 之前所读。
我想要基于复合键 (id_location,id_item) 的以下结果
所以,我想要一个具有不同itens的结果(关于复合键),但是当我在两个rdds中找到具有相同键的记录时,我只想保留来自rdd2的记录。
有人有这种要求吗?
我正在使用 spark 和 scala。
最好的问候拉斐尔。
apache-spark - 为什么 pyspark 会选择未广播的变量?
我正在使用 pyspark 分析数据集,我有点惊讶为什么即使我使用的是未广播的变量,以下代码也能正常工作。
有问题的变量是video
,在函数中使用filter
,在连接之后。
我在独立模式下使用 pyspark,pyspark-submit 有以下参数:
--num-executors 12 --executor-cores 4 --executor-memory 1g --master local[*]
另外,我在jupyter(新的 ipython-notebooks)上运行之前的代码。
python - 在火花数据框中使用凤凰动态列
Phoenix 能够在为表选择初始模式后添加动态列。您可以在 upserts 期间动态添加列。我的问题是如何有效地使用这些列,尤其是在 spark 数据帧中?我了解连接 spark 数据框 phoenix 时具有架构,并且我知道动态列不在架构中。那么这可能吗?
python - 用户定义的函数会破坏 pyspark 数据框
我的 spark 版本是 1.3,我正在使用 pyspark。
我有一个名为 df 的大型数据框。
然后我选择数据框的几列并尝试计算行数。这工作正常。
然后我应用用户定义的函数将其中一列从字符串转换为数字,这也可以正常工作
但是,如果我尝试计算行数,即使该类型显示它是一个数据框,就像 df3 一样,我也会得到一个异常。
我的错误:
我是否正确使用了用户定义的函数?知道为什么数据框功能不适用于数据框吗?
unicode - 带有架构的行中的 sqlContext.createDataframe。pyspark:TypeError:IntegerType 不能接受类型中的对象
在花了很多时间弄清楚为什么我收到以下错误之后
在尝试基于行和模式创建数据框时,我注意到以下内容:
在我的 rdd 中有一个名为 rrdRows 的 Row,如下所示:
我的 dfSchema 定义为:
创建一个数据框如下:
带来了上面提到的Error,因为Spark只考虑了StructFields在schema中的顺序,并没有将StructFields的名称与Row字段的名称进行匹配。
换句话说,在上面的示例中,我注意到 spark 尝试创建一个如下所示的数据帧(如果不存在 typeError。例如,如果所有内容都是 String 类型)
这真的是预期的,还是某种错误?
编辑: rddRows 是按照这些思路创建的:
其中 rddDict 是已解析的 JSON 文件。
apache-spark - Python API 中是否提供 Spark SQL UDAF(用户定义的聚合函数)?
从 Spark 1.5.0 开始,似乎可以为 DataFrames 上的自定义聚合编写自己的 UDAF: Spark 1.5 DataFrame API 亮点:日期/时间/字符串处理、时间间隔和 UDAF
但是,我不清楚 Python API 是否支持此功能?
python - 将二进制数据读入(py)spark DataFrame
我正在将一个二进制文件摄取到 Spark 中——文件结构很简单,它由一系列记录组成,每条记录都包含许多浮点数。目前,我正在 python 中以块的形式读取数据,然后遍历各个记录以将它们转换为Row
Spark 可以用来构造DataFrame
. 这是非常低效的,因为不是以块的形式处理数据,而是需要我遍历各个元素。
是否有一种明显(首选)的方式来摄取这样的数据?理想情况下,我将能够将文件的一部分(比如 10240 条记录左右)读入缓冲区,指定架构并将其DataFrame
直接转换为。我看不到使用当前 API 执行此操作的方法,但也许我遗漏了一些东西?
这是一个演示当前过程的示例笔记本:https ://gist.github.com/rokroskar/bc0b4713214bb9b1e5ed
理想情况下,我将能够摆脱 for 循环buf
,直接将整个批次转换为对象read_batches
数组。Row
scala - 在Scala中将来自不同数据帧的行合并在一起
例如首先我有一个这样的数据框
我们有 2012 年、1997 年和 2015 年。我们还有另一个这样的 Dataframe
我们还有 2012 年、1997 年、2015 年。我们如何将同一年的行合并在一起?谢谢
输出应该是这样的
scala - Spark DataFrame 上的并发操作
我需要对 DataFrame 和计数进行不同的过滤操作,然后对单个计数进行求和。我使用 Scala Future 进行并发执行。这是代码:
每个过滤器/计数操作的运行时间大约需要 7 秒。但是,运行多次后,并发执行的总时间总是在 35 秒左右,而不是我预期的 7 秒。我对这种行为困惑了很长一段时间,但无法弄清楚。
我有一个由 3 台机器组成的集群,一个主节点,两个工作节点,每个节点有 128G 内存和 32 个内核。数据大小约为3G。我注意到在并发执行期间,一个工作节点有 20 秒的 GC 时间。我已经调整了 GC,使得单个过滤器/计数操作几乎没有 GC 时间。我不确定为什么每当我运行 3 个 Future 的并发执行时 GC 就会启动,以及是否是导致并发执行时间更长的原因。
有人在这个问题上有经验吗?