问题标签 [apache-spark-dataset]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
678 浏览

scala - dataset flatmap groupBy 缺少参数类型

我有一个包含数字列表的列的数据集。

我想计算所有这些列表中每个数字的出现次数。所以我做了一个flatMap,得到一组所有整数。我想对它进行分组,所以我每个数字只有一次,然后添加出现次数(在第二列或其他内容中)。到目前为止我的代码:

但它总是说,“i”缺少参数类型。我想我需要告诉它是一个 Int,但我该怎么做呢?还是我错过了完全不同的东西?

0 投票
0 回答
3461 浏览

scala - 尝试在 Spark 2.0 中的 DataFrame 上执行 flatMap 时,找不到存储在 Dataset 中的类型的编码器

我不断收到以下编译时错误:

我刚从 Spark v1.6 升级到 v2.0.2,一大堆代码DataFrame都在抱怨这个错误。它抱怨的代码如下所示。

以前的 SO 帖子建议

但是,我没有任何 case 类,因为我使用DataFrame的是等于DataSet[Row],而且,我已经按如下方式内联了 2 个隐式导入,而没有任何帮助来消除此消息。

请注意,我查看了DataSetEncoder的文档。文档说如下。

但是,我的方法无法访问SparkSession. 另外,当我尝试那条线时import spark.implicits._,IntelliJ 甚至找不到它。当我说我的 DataFrame 是 DataSet[Row] 时,我是认真的。

这个问题被标记为可能重复,但请让我澄清一下。

  • 我没有关联的案例类或业务对象。
  • 我正在使用 .flatMap 而另一个问题是使用 .map
  • 隐式导入似乎没有帮助
  • 传递 RowEncoder 会产生编译时错误,例如data.flatMap(row => { ... }, RowEncoder(data.schema))(参数过多)

我正在阅读其他帖子,让我补充一下,我想我不知道这个新的 Spark 2.0 Datasets/DataFrame API 应该如何工作。在 Spark shell 中,下面的代码有效。请注意,我像这样启动 spark shell$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

但是,当我将其作为测试单元的一部分运行时,我会遇到同样的无法找到编码器错误。为什么这在 shell 中有效,但在我的测试单元中无效?

在 shell 中,我输入:imports并将:implicits它们放在我的 scala 文件/源中,但这也无济于事。

0 投票
2 回答
1925 浏览

apache-spark-dataset - 空指针异常 - Apache Spark 数据集左外连接

我正在尝试学习火花数据集(火花 2.0.1)。左外连接下方正在创建空指针异常。

2014 年 16 月 12 日 16:48:26 错误执行程序:阶段 2.0 (TID 12) 中任务 0.0 中的异常 java.lang.NullPointerException

这是因为在进行左外连接时,它为 record._2.depname 提供了空值。

如何处理?谢谢

0 投票
1 回答
2082 浏览

apache-spark - Spark 动态 DAG 比硬编码的 DAG 慢很多并且不同

我在 spark 中有一个操作,应该对数据框中的几列执行。通常,有 2 种可能性来指定此类操作

  • 硬编码
  • 从列名列表动态生成它们

问题是动态生成的 DAG 是不同的,并且当使用更多列时,动态解决方案的运行时间会比硬编码操作增加得更多。

我很好奇如何将动态构造的优雅与快速执行时间结合起来

这是示例代码的 DAG 的比较 复杂度比较

对于大约 80 列,这为硬编码的变体生成了一个相当不错的图形, 硬编码 并且对于动态构造的查询来说,这是一个非常大的、可能不太可并行化且速度较慢的 DAG。 巨大的混乱动态

当前版本的 spark (2.0.2)DataFrames与 spark-sql一起使用

完成最小示例的代码:

编辑

运行您的任务会foldleft生成一个线性 DAG 向左折叠 ,并对所有列的函数进行硬编码会导致 硬编码

两者都比我原来的 DAG 好很多,但硬编码的变体对我来说看起来更好。在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码的执行图,但这看起来相当难看。你看到任何其他选择吗?

0 投票
2 回答
16227 浏览

scala - Spark Dataframes - 关键减少

假设我有一个像这样的数据结构,其中 ts 是一些时间戳

给定大量这些记录,我希望得到每个 id 时间戳最高的记录。使用 RDD api,我认为以下代码可以完成工作:

同样,这是我对数据集的尝试:

我一直在尝试解决如何使用数据框实现类似的功能,但无济于事-我意识到我可以使用以下方法进行分组:

但这给了我一个 RelationGroupedDataSet 并且我不清楚我需要编写什么聚合函数来实现我想要的 - 我见过的所有示例聚合似乎都专注于只返回一个被聚合的列而不是整行。

是否可以使用数据框来实现这一点?

0 投票
1 回答
3037 浏览

apache-spark - 如何针对 Spark DataFrame 并行化/分布查询/计数?

我有一个DataFrame我必须对其应用一系列过滤器查询。例如,我DataFrame按如下方式加载我的。

然后我有一堆“任意”过滤器,如下所示。

  • C0='真' 和 C1='假'
  • C0='假' 和 C3='真'
  • 等等...

我通常使用 util 方法动态获取这些过滤器。

我所做的就是将这些过滤器应用于DataFrame以获取计数。例如。

我注意到映射过滤器时这不是并行/分布式操作。如果我将过滤器粘贴到 RDD/DataFrame 中,这种方法也不起作用,因为我将执行嵌套数据帧操作(正如我在 SO 上所读到的,Spark 中不允许这样做)。类似以下的内容会产生 NullPointerException (NPE)。

有没有办法DataFrame在 Spark 上并行化/分布计数过滤器?顺便说一句,我在 Spark v2.0.2 上。

0 投票
2 回答
15719 浏览

apache-spark - Spark DataSet 过滤器性能

我一直在尝试不同的方法来过滤类型化的数据集。事实证明,性能可能完全不同。

该数据集是基于 33 列和 4226047 行的 1.6 GB 数据行创建的。DataSet 通过加载 csv 数据创建并映射到案例类。

UnitId = 'B02' 上的过滤器应返回 47980 行。我测试了以下三种方法:1)使用类型列(本地主机上约 500 毫秒)

2)使用临时表和sql查询(〜与选项1相同)

3)使用强类型类字段(14,987ms,即慢30倍)

我用python API再次测试了一下,同样的数据集,时间为17046ms,与scala API option 3的性能相当。

有人可以阐明 3) 和 python API 的执行方式与前两个选项的不同吗?

0 投票
2 回答
849 浏览

scala - 如何将数据框中的空值填充到uuid?

有一列中有一个空值的数据框(不是全部为空),它需要用uuid填充空值,有没有办法?

我试过这种方式,但“field2”的每一行都有相同的uuid。

怎么做?如果有超过 1,000,000 行

0 投票
1 回答
638 浏览

apache-spark - DataSet 相对于 RDD 的性能优势

在阅读了几篇关于 Spark 数据集的精彩文章(thisthisthis)之后,我总结了下一个 DataSet 相对于 RDD 的性能优势:

  1. 逻辑和物理计划优化;
  2. 严格的类型化;
  3. 矢量化操作;
  4. 低级内存管理。

问题:

  1. Spark 的 RDD 还构建了物理计划,可以在同一阶段组合/优化多个转换。那么DataSet 相对于 RDD 有什么好处呢?
  2. 第一个链接你可以看到一个例子RDD[Person]DataSet 有高级类型化吗?
  3. “矢量化操作”是什么意思?
  4. 据我了解,DataSet 的低内存管理 = 高级序列化。这意味着可序列化对象的堆外存储,您可以在其中仅读取对象的一个​​字段而无需反序列化。但是当你有IN_MEMORY_ONLY持久性策略的情况下呢?无论如何,DataSet 会序列化所有内容吗?它会比 RDD 有任何性能优势吗?
0 投票
1 回答
3308 浏览

scala - Spark 2.0 隐式编码器,当类型为 Option[Seq[String]] 时处理缺失列(scala)

当我们的数据源中缺少某些类型为 Option[Seq[String]] 的列时,我在编码数据时遇到了一些问题。理想情况下,我希望用None.

设想:

我们有一些正在读取的镶木地板文件,其中包含column1但没有column2

我们将这些 parquet 文件中的数据加载到 aDataset中,并将其转换为MyType.

org.apache.spark.sql.AnalysisException:无法解析“ column2”给定的输入列:[column1];

有没有办法使用 column2 数据创建数据集None