问题标签 [apache-spark-dataset]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
7088 浏览

apache-spark - 具有正确可空性的案例类的 Spark 模式

对于自定义 Estimator 的 transformSchema 方法,我需要能够将输入数据框的模式与案例类中定义的模式进行比较。通常这可以像从案例类中生成 Spark StructType / Schema一样执行,如下所述。但是,使用了错误的可空性:

由 df 推断的真实模式spark.read.csv().as[MyClass]可能如下所示:

案例类:

为了比较我使用:

不幸的是,这总是产生false,因为从案例类手动推断的新模式设置为可空true(因为 ja java.Integer 实际上可能为空)

nullable = false创建架构时如何指定?

0 投票
0 回答
87 浏览

apache-spark - Apache Spark 数据集通过将旧值与新值相加来在数据库中保存新的更新值

我的堆栈是 Apache Spark 2.0、Spark Cassandra Java 连接器、Cassandra 3.7

这是我的要求。

我通过将列族数据提取到我的 Java 程序中创建了一个数据集 (ds1)。下一步是将新数据从外部源获取到新数据集 (ds2)。两个数据集的架构完全相同。但是现在的问题是,在将新值从 ds2 保存到数据库中时,我想以这样的方式更新列,使其先前的值 (ds1.col1) 将添加新值 (ds2.col1),然后保存数据库的新更新值,即 (ds2.col1 + ds1.col1)。

目前我通过以下方式获取 ds1.col1 的值

据我了解 takeAsList() 它对性能有影响。另一种有效的方法是什么?请让我知道您的意见。如果您需要更多信息,请告诉我。

提前致谢。

0 投票
1 回答
336 浏览

apache-spark - 为 Performance Tuning、outOfMemoryError Gc 和 GC limit 设置 spark 参数

我有 5 种模式,每种模式都有 8 个 cpu 内核和 30GB 内存。我有 5GB 的 dataset1 和 65GB 的 dataset2,所以我需要加入它们并保存到 s3 文件系统,我设置了以下参数:

但是,我到处搜索并尝试了不同的参数,但仍然出现内存不足错误。超出 Gc 开销限制。

在 Java 代码中也使用了这个:

stackoverflow 中没有任何解决方案对我有用。所以任何人都可以给我一个更好的解决方案来摆脱这个问题。

有一段时间我得到这个:

0 投票
2 回答
950 浏览

scala - Spark 2.0.0:如何使用自定义编码类型聚合 DataSet?

我有一些数据存储为 DataSet[(Long, LineString)] 使用元组编码器和用于 LineString 的 kryo 编码器

我可以在段列上应用任何映射操作并使用底层的 LineStrign 方法。

我想创建一些 UDAF 来处理具有相同 id 的段,我尝试了以下两种不同的方法,但均未成功:

1)使用聚合器:

在这里,我收到以下错误:

2) 使用 UserDefinedAggregateFunction

我究竟做错了什么?我想将聚合 API 与自定义类型一起使用,而不是使用 rdd groupBy API。我搜索了 Spark 文档,但找不到这个问题的答案,目前似乎还处于早期阶段。

谢谢。

0 投票
1 回答
1530 浏览

apache-spark - 如何在 Apache Spark 上进行非随机数据集拆分?

我知道我可以使用 randomSplit 方法进行随机拆分:

我可以使用一些“nonRandomSplit 方法”将数据分成连续的部分吗?

Apache Spark 2.0.1。提前致谢。

UPD:数据顺序很重要,我将在“较小 ID”的数据上训练我的模型,并在“较大 ID”的数据上对其进行测试。所以我想把数据分成连续的部分而不用改组。

例如

我能想到的唯一解决方案是使用countlimit,但可能有更好的解决方案。

0 投票
1 回答
976 浏览

scala - Apache Spark 加入动态重新分区

我试图在两个表上做一个相当简单的连接,没什么复杂的。加载两个表,进行连接和更新列,但它不断抛出异常。

我注意到任务卡在最后一个分区上199/200并最终崩溃。我怀疑数据是倾斜的,导致所有数据都加载到最后一个分区199中。

SELECT COUNT(DISTINCT report_audit) FROM ReportDs = 1.5million.

尽管

SELECT COUNT(*) FROM ReportDs = 57million.

集群详情:CPU:40核,内存:160G。

这是我的示例代码:

我认为应该有一种优雅的方式来处理这种数据偏斜。

0 投票
1 回答
993 浏览

scala - Spark DataFrame 映射错误

对不起,我需要再问一个问题。我希望这个不要重复。我编辑了最后一个,但我认为没有人看到编辑后的版本。这是问题的一个简短示例:

错误:

我想要做的是,修改每一行。在这种情况下,我知道只有 1 列,我可以在尝试将数据帧行映射到更新的行时像编码器错误一样处理它。但是,如果我有数百列,我该如何解决这个问题?如果某些行不满足条件,我想删除它们。目前我使用:

但我不认为这是最好的解决方案。我也在StackoverflowError中运行:

TY寻求帮助:)

0 投票
0 回答
1042 浏览

scala - 如何根据到前一个有效行的距离从 Spark Dataframe 中过滤异常行

我正在使用具有以下架构的 Spark 2.0 Dataframes 处理地理空间数据:

我已经看到位置信号跳到了完全不同的地方。奇怪的是,信号会保持一段时间,比如在远程位置大约 25 秒或 5 个样本,然后跳回我所在的位置。

我想通过计算点之间的速度来计算当前和最后一个有效行之间的速度来消除异常值。如果速度高于给定阈值,则应删除当前行,最后一个有效行保持不变。如果速度低于阈值,则将当前行添加到结果数据框中并成为新的最后一个有效行。

我试图通过使用 mapPartition 函数来实现这一点:

但我收到以下错误:

欢迎任何其他方法。

0 投票
2 回答
2793 浏览

scala - 将 ADT / 密封特征层次结构编码到 Spark DataSet 列中

如果我想在 Spark DataSet列中存储代数数据类型 (ADT)(即 Scala 密封特征层次结构),最好的编码策略是什么?

例如,如果我有一个叶类型存储不同类型数据的 ADT:

构建一个的最佳方法是什么:

0 投票
1 回答
3804 浏览

scala - 将 Spark 数据集作为函数参数传递

我想将 Spark Dataset 作为参数传递给函数。例如

我想将不同的数据集传递给 printDS()。由于 Spark 数据集是强类型的,如何将 Dataset[Token1] 或 Dataset[Token2] 传递给接受 Dataset[Any] 的 printDS()?我可以将 Spark Dataframes 作为函数参数传递,而不是 Spark Dataset。