问题标签 [apache-spark-encoders]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
319 浏览

scala - 在 Spark 数据集中添加 ADT 列?

我想创建一个包含 ADT 列的数据集。基于这个问题:Encode an ADT / seal trait hierarchy into Spark DataSet column 我知道,有一个用 kryo 编码的解决方案,但这并没有真正的帮助。还有另一种更好的方法来解决这个问题。让我们定义以下 ADT:

并定义一个使用的案例类Animal

我现在可以轻松地从 Pet 创建一个数据集

请注意,声音是 a Struct,但提取元素很简单:

实际上这是我想要实现的最终结构。我面临两个问题。

  1. 通常从案例类继承不是一个好主意
  2. 详尽的模式匹配要求默认情况

问题 2 的示例:

为了克服问题 2,我想创建一个密封的抽象类。我也想把它做成产品

现在解决了问题 2,不再需要默认情况。但是我无法从 Animal 创建数据集。我得到以下异常: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: Couldn't find sound on class Animal

我真正想要获得的是获得与Option. 我们可以创建一个包含可选字段的案例类:

我检查了Option的实现,它也是一个密封的抽象类,所以我错过了什么?数据集的选项是如何编码的?

更新

抱歉, Option的最后一部分在这里没有太大意义,因为您需要在数据集中最后明确地写入您希望看到的值。

但问题仍然存在,我如何使用正确的模式匹配对从 ADT 创建的列进行编码。

0 投票
1 回答
576 浏览

apache-spark - 在 Spark Dataset mapGroups 操作甚至在函数中返回一个字符串之后,值类型是二进制的

环境:

spark 应用程序尝试执行以下操作

1) 将输入数据转换为数据集[GenericRecord]

2) 按 GenericRecord 的键属性分组

3)在分组后使用mapGroups迭代值列表并以字符串格式获得一些结果

4) 将结果作为字符串输出到文本文件中。

写入文本文件时发生错误。Spark 推断步骤 3 中生成的 Dataset 具有二进制列,而不是 String 列。但实际上它在 mapGroups 函数中返回一个字符串。

有没有办法进行列数据类型转换或让 Spark 知道它实际上是一个字符串列而不是二进制?

输出说它是一个垃圾箱

Spark 给出一个错误,它不能将二进制写入文本:

0 投票
1 回答
442 浏览

scala - 错误:找不到类型 org.apache.spark.sql.Dataset[(String, Long)] 的编码器

以下数据集比较测试失败并出现错误:

测试

如您所见,我尝试为 (String, Long) 创建 Kryo 编码器

正在测试的 Spark 应用程序

0 投票
1 回答
159 浏览

apache-spark - 如何将数据框转换为数据集,将父类的对象引用作为另一个类中的组合?

我正在尝试将 a 转换Dataframe为 a Dataset,java类结构如下:

A类:

B类:

和 C 类

数据框中的数据如下:

当我尝试将 Encoders.bean[C](classOf[C]) 应用于数据帧时。当我检查 .isInstanceOf[B] 时,作为类中A实例的对象引用未返回 true,我将其视为 false。数据集的输出如下:BC

我们如何在 foreach 中迭代 C 对象时获取 A 和 B 的所有字段?

代码 :-

文件内容:{"a":[{"a":1,"b":2}]}

0 投票
1 回答
1000 浏览

scala - Spark Dataframe - 编码器

我是 Scala 和 Spark 的新手。

我正在尝试使用编码器从 Spark 读取文件,然后转换为 java/scala 对象。

使用 as 读取应用架构和编码的文件的第一步工作正常。

然后我使用该数据集/数据框进行简单的地图操作,但如果我尝试在结果数据集/数据框上打印模式,它不会打印任何列。

另外,当我第一次读取文件时,我没有在 Person 类中映射 age 字段,只是为了在 map 函数中计算它来尝试 - 但我没有看到那个 age 根本没有使用 Person 映射到数据框.

Person.txt 中的数据:

以下是代码:

0 投票
1 回答
463 浏览

apache-spark - 编码后无法对自定义类型进行操作?火花数据集

假设你有这个(编码自定义类型的解决方案来自这个线程):

当做 a 时ds.show,我得到:

我知道这是因为内容被编码为内部 Spark SQL 二进制表示。但是我怎样才能像这样显示解码后的内容呢?


更新1

显示内容不是最大的问题,更重要的是它在处理数据集时可能会导致问题,考虑这个例子:

这是否意味着,kryo-encoded 类型不能joinWith方便地进行操作?

那么我们如何处理自定义类型Dataset
如果我们在编码后无法处理它,那么这种kryo自定义类型的编码解决方案有什么意义?!

(下面@jacek 提供的解决方案很好了解case class类型,但它仍然无法解码自定义类型

0 投票
0 回答
202 浏览

apache-spark - Spark SQL 中的数据类型(UDT)与编码器

在 Spark SQL 中,用于的 s有限DataType,用于将 JVM 对象转换为内部 Spark SQL 表示和从内部 Spark SQL 表示转换的 s 是有限的。SchemaEncoder

java.lang.UnsupportedOperationException:不支持类型 xxx 的架构

  • 在实践中,我们可能会遇到类似这样的错误Encoder,这通常发生在Dataset[T]带有自定义类型的 a 中,但不会发生在带有自定义类型的 aDataFrame中。讨论Encoder如何在数据集中存储自定义对象?

找不到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持

据我了解,两者都涉及到内部 Spark SQL 优化器(这就是为什么只提供有限数量的DataTypes 和Encoders 的原因);并且两者DataFrameDataset只是Dataset[A],那么..

问题(或更多..混淆)

  • 为什么第一个错误只出现在DataFrame而不出现在Dataset[T]?第二个错误的同样问题......

  • 创建可以UDT解决第二个错误吗?创建编码器可以解决第一个错误吗?

  • 我应该如何理解它们之间的关系,以及它们如何与DatasetSpark SQL 引擎交互?

这篇文章的倡议是在这两个概念上进行更多的探索并吸引公开讨论,所以如果问题不是太具体,请多多包涵……感谢任何理解分享。谢谢。

0 投票
1 回答
98 浏览

scala - 没有 Scala 的联合泛型类型

这工作正常:

这也很好:

但是,我们如何实现这一点以返回类型 A 或 B?

是否有可能拥有泛型类型AB联合类型?干杯。


更新1

Either是一个选项但并不理想,因为它在处理返回的结果时需要模式匹配。实际上,我想要这个:A <: A|B, B <: A|BEither但没有实现。

另一个极端,我可以这样做,但是类型太松了:


UPDATE2
为什么我不想要Either

(原因 2)

返回的结果实际上是 a Datasetof Spark,这需要Encoder任何不是子类型的类型Product(请参阅本主题)。

多次调用此方法时,最终会出现太多的Either相互包装,例如Either[Either[Either[...]]]. 这需要定义太多的编码器。

所以我实际上是这样做的:

如果我这样做,由于类型的不同,它需要有许多不同的编码器Either

0 投票
0 回答
64 浏览

scala - 使用 kryo 期望在 Scala Spark 中编码 Guava 缓存

我正在使用 GuavaCache (带有scalacache 包装器),我想在一些火花管道中存储和使用整个缓存对象。

我使用 kryo 编码器进行编码SomeService,这是一个包含缓存的案例类。运行代码时,抛出以下异常:

Guava 缓存是可序列化的,因此我认为 Kryo 编码器应该能够对其进行编码。LocalManualCache类也被定义为Serializable

  • 当我直接使用番石榴缓存时同样的错误
  • 我不想使用asMap,因为每条记录的 TTL 都会丢失。

我可以以某种方式手动编码和解码 gauva 缓存对象吗?还是不可能使用自定义缓存?

0 投票
1 回答
40 浏览

java - 用于不可变数据类型的 spark sql 编码器

在编写 java 代码时,我通常使用不可变值类型。有时它是通过库(Immutables、AutoValue、Lombok),但大多只是普通的 java 类:

  • 所有final领域
  • 以所有字段为参数的构造函数

(鉴于当前的 spark 支持,此问题适用于 java 11 及更低版本)。

在 Spark Sql 中,数据类型需要一个Encoder. 使用现成的编码器Encoder.bean(MyType.class),如使用这种不可变的数据类型会导致“非法反射访问操作”。

我很好奇 spark sql (dataset) 方法是什么。显然我可以放松这一点,让它成为一个可变的 pojo。


更新

查看Encoders.bean它的代码确实必须是一个经典的、可变的 POJO。反射代码寻找合适的设置器。此外(并且记录在案)唯一支持的集合类型是array,listmap(not set)。