问题标签 [spark-dataframe]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
87143 浏览

scala - 从分区拼花文件中读取 DataFrame

如何读取条件为数据框的分区镶木地板,

这工作正常,

分区用于day=1 to day=30是否可以读取类似(day = 5 to 6)or的内容day=5,day=6

如果我把*它给我所有 30 天的数据,它太大了。

0 投票
0 回答
498 浏览

r - SparkR:DataFrame 上的 dplyr 样式拆分应用组合

在之前的 RDD 范式下,我可以指定一个键,然后将一个操作映射到每个键对应的 RDD 元素。从 1.5.1 开始,我看不到使用 SparkR 中的 DataFrame 执行此操作的明确方法。我想做的是类似dplyr操作:

我目前有一个大型 SparkR DataFrame 的形式:

我按id和排序timestamp

我想分组id,但我不想聚合。相反,我想对每个组进行一组转换和计算——例如,插值以填充 NA(当我collect使用 DataFrame 然后转换value为数字时生成)。我已经使用 进行了测试agg,但是虽然我的计算确实可以运行,但没有返回结果,因为我没有在 中返回单个值myfunc

请注意,myfunc当我filter将 DataFrame 缩减为单个id并运行它时,所有操作都可以正常工作。根据运行所需的时间(每个任务大约 50 秒)和没有抛出异常的事实,我相信myfunc确实在所有的ids 上运行——但我需要输出!

任何输入将不胜感激。

0 投票
1 回答
4255 浏览

apache-spark - 在火花流中处理数据库连接

我不确定我是否正确理解 spark 处理数据库连接的方式以及如何在 spark 内部使用大量数据库更新操作可靠,而不会破坏 spark 工作。这是我一直在使用的代码片段(为了便于说明):

基本上,我使用 Reactive Mongo (Scala),对于每个 RDD,我将其转换为数据框,分组/提取必要的数据,然后针对 mongo 触发大量数据库更新查询。我想问一下:

  1. 我正在使用 mesos 在 3 台服务器上部署 spark 并为 mongo 数据库再配备一台服务器。这是处理数据库连接的正确方法吗?我担心的是数据库连接/轮询是否在火花作业开始时打开并在火花的整个持续时间(几周,几个月......)期间正确维护(尽管超时/网络错误故障转移),以及它是否会在每个批完了?鉴于作业可能被安排在不同的服务器上?这是否意味着每批,都会打开不同的数据库连接集?

  2. 如果在执行查询时发生异常会发生什么。该批次的火花作业会失败吗?但是下一批会继续吗?

  3. 如果在 mongo-database 上运行 update 的查询太多(2000->+),并且执行时间超过了配置的 spark 批处理持续时间(2 分钟),会导致问题吗?我注意到,在我当前的设置中,大约 2-3 天后,所有批次都在 Spark WebUI 上作为“进程”排队(如果我禁用 mongo 更新部分,那么我可以运行一周而不会出现问题),没有能够正常退出。这基本上会挂起所有批处理作业,直到我重新启动/重新提交作业。

非常感谢。如果您能帮我解决这个问题,我将不胜感激。

0 投票
1 回答
2092 浏览

java - 从文字值创建 DataFrame 和 JavaRDD

我正在用 Java 编写 Spark 应用程序,我想知道如何从文字值创建 DataFrame 和/或 JavaRDD。

例如,我有 3 个整数,比如说(784512, 35, 40)对应于 fields / columns (id, m_count, f_count)

0 投票
2 回答
982 浏览

apache-spark - 如何将 temptable 保存到 Hive 元存储(并在 Hive 中对其进行分析)?

我使用 Spark 1.3.1。

如何将 DataFrame 数据存储/保存到 Hive 元存储?

在 Hive 中,如果我运行show tablesDataFrame 不会在 Hive 数据库中显示为表。我已经复制hive-site.xml$SPARK_HOME/conf,但它没有帮助(数据框也没有出现在 Hive 元存储中)。

我正在关注这个文档,使用 spark 1.4 版本。

如何分析 Hive 中的 spark 表?

0 投票
1 回答
6015 浏览

apache-spark - 如何控制使用 partitionBy 时生成的 parquet 文件数量

我有一个DataFrame需要根据特定分区写入 S3 的内容。代码如下所示:

partitionBy数据拆分为相当多的文件夹(~400),每个文件夹只有一点点数据(~1GB)。问题来了——因为默认值为spark.sql.shuffle.partitions200,每个文件夹中1GB的数据被分割成200个parquet小文件,总共写入了大约80000个parquet文件。由于多种原因,这不是最佳选择,我想避免这种情况。

我当然可以将 设置spark.sql.shuffle.partitions为一个小得多的数字,比如 10,但据我了解,此设置还控制连接和聚合中随机播放的分区数量,所以我真的不想更改它。

有谁知道是否有另一种方法来控制写入多少文件?

0 投票
1 回答
1041 浏览

datetime - 如何计算火花数据框中一个列的差异?

对于 spark 的数据框,我想计算日期时间的差异,就像在numpy.diff(array)

0 投票
1 回答
552 浏览

json - 与 Spark 数据帧不一致的 JSON 模式猜测

尝试使用 Spark 1.4.1 数据帧读取 JSON 文件并在其中导航。似乎猜测的架构不正确。

JSON文件是:

火花代码是:

结果是:

很明显 TUPLE_CRA 是一个数组。我不明白为什么它没有被猜到。在我看来,推断的模式应该是:

有人有解释吗?如果 JSON 模式更复杂,有没有办法轻松地告诉 Spark 实际模式是什么?

0 投票
1 回答
1947 浏览

json - Spark中DataFrame.show()方法中的java.NullPointException - scala

编辑:对以前的问题质量感到抱歉,我希望这个问题会更清楚:使用 Spark 应用程序,我正在加载以下 JSON 文件的整个目录:

进入DataFrame并保存为临时表,以便以后使用。在此 Json 中,“有效负载”节点中的字段始终存在,但“主数据”中的子节点是可选的。下一步是为 Json 的每个子节点创建多个 DataFrame,如下所示:DataFrame data1 包含来自所有文件的节点“data1”的数据,看起来像一个带有“id”列的常规表。在第一个处理部分之后,我的 Spark 状态如下: DataFrames: data1(id), data2(id), data3(id), data11(id), data12(id), md1(id), md2(id)

问题来了 - 如果目录中的一个 JSON 文件不包含 md2 节点,由于 NullPointException ,我既不能运行也show()不能在“md2”DataFrame 上运行。collect()我会理解是否所有文件都缺少“md2”节点,因此它无法创建 md2 DataFrame,但在这种情况下,我希望 md2 DataFrame 根本没有来自没有节点 md2 的 json 文件的数据,但包含所有其他文件。

技术细节:要从嵌套节点读取数据,我使用 rdd.map 和 rdd.flatmap,然后我将其转换为DataFrame自定义列名

如果我在目录中的所有文件包含所有节点时运行应用程序,一切正常,但是如果单个文件丢失 md2 节点应用程序在 .show() 或 .collect() 上失败

顺便说一句,如果节点存在但它为空,则一切正常。

有什么方法可以让 Spark 支持可选的 Json 节点或处理 rdd.map&flatmap 中缺失的节点?

我希望它比上一个问题更清楚

在@Beryllium 请求中,这里是我用来获取 md2 DataFrame 的 rdd 操作

0 投票
2 回答
2803 浏览

scala - Apache Spark:指数移动平均线

我正在用 Spark/Scala 编写一个应用程序,我需要在其中计算一列的指数移动平均值。

我面临的问题是我需要同一列的先前计算的值(EMA_t-1)。通过 mySQL,这可以通过使用 MODEL 或通过创建一个 EMA 列来实现,然后您可以更新每行的行,但我已经尝试过了,并且既不能使用 Spark SQL 也不能使用 Hive 上下文......有什么办法我可以访问这个 EMA_t-1?

我的数据如下所示:

所以我需要添加一个新列,其中我的第一个值只是第一行的价格,然后我需要使用以前的值:EMA_t = (price_t * 0.4) + (EMA_t-1 * 0.6) 来计算该列中的以下行。我的 EMA 列必须是:

我目前正在尝试使用 Spark SQL 和 Hive 来做到这一点,但如果可以用另一种方式来做到这一点,这将同样受欢迎!我还想知道如何使用 Spark Streaming 做到这一点。我的数据在数据框中,我使用的是 Spark 1.4.1。

非常感谢您提供的任何帮助!