问题标签 [spark-dataframe]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 从分区拼花文件中读取 DataFrame
如何读取条件为数据框的分区镶木地板,
这工作正常,
分区用于day=1 to day=30
是否可以读取类似(day = 5 to 6)
or的内容day=5,day=6
,
如果我把*
它给我所有 30 天的数据,它太大了。
r - SparkR:DataFrame 上的 dplyr 样式拆分应用组合
在之前的 RDD 范式下,我可以指定一个键,然后将一个操作映射到每个键对应的 RDD 元素。从 1.5.1 开始,我看不到使用 SparkR 中的 DataFrame 执行此操作的明确方法。我想做的是类似dplyr
操作:
我目前有一个大型 SparkR DataFrame 的形式:
我按id
和排序timestamp
。
我想分组id
,但我不想聚合。相反,我想对每个组进行一组转换和计算——例如,插值以填充 NA(当我collect
使用 DataFrame 然后转换value
为数字时生成)。我已经使用 进行了测试agg
,但是虽然我的计算确实可以运行,但没有返回结果,因为我没有在 中返回单个值myfunc
:
请注意,myfunc
当我filter
将 DataFrame 缩减为单个id
并运行它时,所有操作都可以正常工作。根据运行所需的时间(每个任务大约 50 秒)和没有抛出异常的事实,我相信myfunc
确实在所有的id
s 上运行——但我需要输出!
任何输入将不胜感激。
apache-spark - 在火花流中处理数据库连接
我不确定我是否正确理解 spark 处理数据库连接的方式以及如何在 spark 内部使用大量数据库更新操作可靠,而不会破坏 spark 工作。这是我一直在使用的代码片段(为了便于说明):
基本上,我使用 Reactive Mongo (Scala),对于每个 RDD,我将其转换为数据框,分组/提取必要的数据,然后针对 mongo 触发大量数据库更新查询。我想问一下:
我正在使用 mesos 在 3 台服务器上部署 spark 并为 mongo 数据库再配备一台服务器。这是处理数据库连接的正确方法吗?我担心的是数据库连接/轮询是否在火花作业开始时打开并在火花的整个持续时间(几周,几个月......)期间正确维护(尽管超时/网络错误故障转移),以及它是否会在每个批完了?鉴于作业可能被安排在不同的服务器上?这是否意味着每批,都会打开不同的数据库连接集?
如果在执行查询时发生异常会发生什么。该批次的火花作业会失败吗?但是下一批会继续吗?
如果在 mongo-database 上运行 update 的查询太多(2000->+),并且执行时间超过了配置的 spark 批处理持续时间(2 分钟),会导致问题吗?我注意到,在我当前的设置中,大约 2-3 天后,所有批次都在 Spark WebUI 上作为“进程”排队(如果我禁用 mongo 更新部分,那么我可以运行一周而不会出现问题),没有能够正常退出。这基本上会挂起所有批处理作业,直到我重新启动/重新提交作业。
非常感谢。如果您能帮我解决这个问题,我将不胜感激。
java - 从文字值创建 DataFrame 和 JavaRDD
我正在用 Java 编写 Spark 应用程序,我想知道如何从文字值创建 DataFrame 和/或 JavaRDD。
例如,我有 3 个整数,比如说(784512, 35, 40)
对应于 fields / columns (id, m_count, f_count)
。
apache-spark - 如何将 temptable 保存到 Hive 元存储(并在 Hive 中对其进行分析)?
我使用 Spark 1.3.1。
如何将 DataFrame 数据存储/保存到 Hive 元存储?
在 Hive 中,如果我运行show tables
DataFrame 不会在 Hive 数据库中显示为表。我已经复制hive-site.xml
到$SPARK_HOME/conf
,但它没有帮助(数据框也没有出现在 Hive 元存储中)。
我正在关注这个文档,使用 spark 1.4 版本。
如何分析 Hive 中的 spark 表?
apache-spark - 如何控制使用 partitionBy 时生成的 parquet 文件数量
我有一个DataFrame
需要根据特定分区写入 S3 的内容。代码如下所示:
将partitionBy
数据拆分为相当多的文件夹(~400),每个文件夹只有一点点数据(~1GB)。问题来了——因为默认值为spark.sql.shuffle.partitions
200,每个文件夹中1GB的数据被分割成200个parquet小文件,总共写入了大约80000个parquet文件。由于多种原因,这不是最佳选择,我想避免这种情况。
我当然可以将 设置spark.sql.shuffle.partitions
为一个小得多的数字,比如 10,但据我了解,此设置还控制连接和聚合中随机播放的分区数量,所以我真的不想更改它。
有谁知道是否有另一种方法来控制写入多少文件?
datetime - 如何计算火花数据框中一个列的差异?
对于 spark 的数据框,我想计算日期时间的差异,就像在numpy.diff(array)
json - 与 Spark 数据帧不一致的 JSON 模式猜测
尝试使用 Spark 1.4.1 数据帧读取 JSON 文件并在其中导航。似乎猜测的架构不正确。
JSON文件是:
火花代码是:
结果是:
很明显 TUPLE_CRA 是一个数组。我不明白为什么它没有被猜到。在我看来,推断的模式应该是:
有人有解释吗?如果 JSON 模式更复杂,有没有办法轻松地告诉 Spark 实际模式是什么?
json - Spark中DataFrame.show()方法中的java.NullPointException - scala
编辑:对以前的问题质量感到抱歉,我希望这个问题会更清楚:使用 Spark 应用程序,我正在加载以下 JSON 文件的整个目录:
进入DataFrame
并保存为临时表,以便以后使用。在此 Json 中,“有效负载”节点中的字段始终存在,但“主数据”中的子节点是可选的。下一步是为 Json 的每个子节点创建多个 DataFrame,如下所示:DataFrame data1 包含来自所有文件的节点“data1”的数据,看起来像一个带有“id”列的常规表。在第一个处理部分之后,我的 Spark 状态如下: DataFrames: data1(id), data2(id), data3(id), data11(id), data12(id), md1(id), md2(id)
问题来了 - 如果目录中的一个 JSON 文件不包含 md2 节点,由于 NullPointException ,我既不能运行也show()
不能在“md2”DataFrame 上运行。collect()
我会理解是否所有文件都缺少“md2”节点,因此它无法创建 md2 DataFrame,但在这种情况下,我希望 md2 DataFrame 根本没有来自没有节点 md2 的 json 文件的数据,但包含所有其他文件。
技术细节:要从嵌套节点读取数据,我使用 rdd.map 和 rdd.flatmap,然后我将其转换为DataFrame
自定义列名
如果我在目录中的所有文件包含所有节点时运行应用程序,一切正常,但是如果单个文件丢失 md2 节点应用程序在 .show() 或 .collect() 上失败
顺便说一句,如果节点存在但它为空,则一切正常。
有什么方法可以让 Spark 支持可选的 Json 节点或处理 rdd.map&flatmap 中缺失的节点?
我希望它比上一个问题更清楚
在@Beryllium 请求中,这里是我用来获取 md2 DataFrame 的 rdd 操作
scala - Apache Spark:指数移动平均线
我正在用 Spark/Scala 编写一个应用程序,我需要在其中计算一列的指数移动平均值。
我面临的问题是我需要同一列的先前计算的值(EMA_t-1)。通过 mySQL,这可以通过使用 MODEL 或通过创建一个 EMA 列来实现,然后您可以更新每行的行,但我已经尝试过了,并且既不能使用 Spark SQL 也不能使用 Hive 上下文......有什么办法我可以访问这个 EMA_t-1?
我的数据如下所示:
所以我需要添加一个新列,其中我的第一个值只是第一行的价格,然后我需要使用以前的值:EMA_t = (price_t * 0.4) + (EMA_t-1 * 0.6) 来计算该列中的以下行。我的 EMA 列必须是:
我目前正在尝试使用 Spark SQL 和 Hive 来做到这一点,但如果可以用另一种方式来做到这一点,这将同样受欢迎!我还想知道如何使用 Spark Streaming 做到这一点。我的数据在数据框中,我使用的是 Spark 1.4.1。
非常感谢您提供的任何帮助!