问题标签 [apache-spark-encoders]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 在 Spark 数据集中添加 ADT 列?
我想创建一个包含 ADT 列的数据集。基于这个问题:Encode an ADT / seal trait hierarchy into Spark DataSet column 我知道,有一个用 kryo 编码的解决方案,但这并没有真正的帮助。还有另一种更好的方法来解决这个问题。让我们定义以下 ADT:
并定义一个使用的案例类Animal
我现在可以轻松地从 Pet 创建一个数据集
请注意,声音是 a Struct
,但提取元素很简单:
实际上这是我想要实现的最终结构。我面临两个问题。
- 通常从案例类继承不是一个好主意
- 详尽的模式匹配要求默认情况
问题 2 的示例:
为了克服问题 2,我想创建一个密封的抽象类。我也想把它做成产品
现在解决了问题 2,不再需要默认情况。但是我无法从 Animal 创建数据集。我得到以下异常:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: Couldn't find sound on class Animal
我真正想要获得的是获得与Option
. 我们可以创建一个包含可选字段的案例类:
我检查了Option的实现,它也是一个密封的抽象类,所以我错过了什么?数据集的选项是如何编码的?
更新
抱歉, Option的最后一部分在这里没有太大意义,因为您需要在数据集中最后明确地写入您希望看到的值。
但问题仍然存在,我如何使用正确的模式匹配对从 ADT 创建的列进行编码。
apache-spark - 在 Spark Dataset mapGroups 操作甚至在函数中返回一个字符串之后,值类型是二进制的
环境:
spark 应用程序尝试执行以下操作
1) 将输入数据转换为数据集[GenericRecord]
2) 按 GenericRecord 的键属性分组
3)在分组后使用mapGroups迭代值列表并以字符串格式获得一些结果
4) 将结果作为字符串输出到文本文件中。
写入文本文件时发生错误。Spark 推断步骤 3 中生成的 Dataset 具有二进制列,而不是 String 列。但实际上它在 mapGroups 函数中返回一个字符串。
有没有办法进行列数据类型转换或让 Spark 知道它实际上是一个字符串列而不是二进制?
输出说它是一个垃圾箱
Spark 给出一个错误,它不能将二进制写入文本:
scala - 错误:找不到类型 org.apache.spark.sql.Dataset[(String, Long)] 的编码器
以下数据集比较测试失败并出现错误:
测试
如您所见,我尝试为 (String, Long) 创建 Kryo 编码器
正在测试的 Spark 应用程序
apache-spark - 如何将数据框转换为数据集,将父类的对象引用作为另一个类中的组合?
我正在尝试将 a 转换Dataframe
为 a Dataset
,java类结构如下:
A类:
B类:
和 C 类
数据框中的数据如下:
当我尝试将 Encoders.bean[C](classOf[C]) 应用于数据帧时。当我检查 .isInstanceOf[B] 时,作为类中A
实例的对象引用未返回 true,我将其视为 false。数据集的输出如下:B
C
我们如何在 foreach 中迭代 C 对象时获取 A 和 B 的所有字段?
代码 :-
文件内容:{"a":[{"a":1,"b":2}]}
scala - Spark Dataframe - 编码器
我是 Scala 和 Spark 的新手。
我正在尝试使用编码器从 Spark 读取文件,然后转换为 java/scala 对象。
使用 as 读取应用架构和编码的文件的第一步工作正常。
然后我使用该数据集/数据框进行简单的地图操作,但如果我尝试在结果数据集/数据框上打印模式,它不会打印任何列。
另外,当我第一次读取文件时,我没有在 Person 类中映射 age 字段,只是为了在 map 函数中计算它来尝试 - 但我没有看到那个 age 根本没有使用 Person 映射到数据框.
Person.txt 中的数据:
以下是代码:
apache-spark - 编码后无法对自定义类型进行操作?火花数据集
假设你有这个(编码自定义类型的解决方案来自这个线程):
当做 a 时ds.show
,我得到:
我知道这是因为内容被编码为内部 Spark SQL 二进制表示。但是我怎样才能像这样显示解码后的内容呢?
更新1
显示内容不是最大的问题,更重要的是它在处理数据集时可能会导致问题,考虑这个例子:
这是否意味着,kryo
-encoded 类型不能joinWith
方便地进行操作?
那么我们如何处理自定义类型
Dataset
呢?
如果我们在编码后无法处理它,那么这种kryo
自定义类型的编码解决方案有什么意义?!
(下面@jacek 提供的解决方案很好了解case class
类型,但它仍然无法解码自定义类型)
apache-spark - Spark SQL 中的数据类型(UDT)与编码器
在 Spark SQL 中,用于的 s是有限DataType
的,用于将 JVM 对象转换为内部 Spark SQL 表示和从内部 Spark SQL 表示转换的 s 是有限的。Schema
Encoder
- 在实践中,我们可能会遇到类似这样的错误
DataType
,这通常发生在DataFrame
带有自定义类型的 a 中,但不会发生在带有自定义类型的 aDataset[T]
中。DataType
(或UDT
)点的讨论如何在 Spark SQL 中为自定义类型定义模式?
java.lang.UnsupportedOperationException:不支持类型 xxx 的架构
- 在实践中,我们可能会遇到类似这样的错误
Encoder
,这通常发生在Dataset[T]
带有自定义类型的 a 中,但不会发生在带有自定义类型的 aDataFrame
中。讨论Encoder
如何在数据集中存储自定义对象?
找不到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持
据我了解,两者都涉及到内部 Spark SQL 优化器(这就是为什么只提供有限数量的DataType
s 和Encoder
s 的原因);并且两者DataFrame
都Dataset
只是Dataset[A]
,那么..
问题(或更多..混淆)
为什么第一个错误只出现在
DataFrame
而不出现在Dataset[T]
?第二个错误的同样问题......创建可以
UDT
解决第二个错误吗?创建编码器可以解决第一个错误吗?我应该如何理解它们之间的关系,以及它们如何与
Dataset
Spark SQL 引擎交互?
这篇文章的倡议是在这两个概念上进行更多的探索并吸引公开讨论,所以如果问题不是太具体,请多多包涵……感谢任何理解分享。谢谢。
scala - 没有 Scala 的联合泛型类型
这工作正常:
这也很好:
但是,我们如何实现这一点以返回类型 A 或 B?
是否有可能拥有泛型类型A和B的联合类型?干杯。
更新1
Either
是一个选项但并不理想,因为它在处理返回的结果时需要模式匹配。实际上,我想要这个:A <: A|B
, B <: A|B
,Either
但没有实现。
另一个极端,我可以这样做,但是类型太松了:
UPDATE2
为什么我不想要Either
?
(原因 2)
返回的结果实际上是 a Dataset
of Spark,这需要Encoder
任何不是子类型的类型Product
(请参阅本主题)。
多次调用此方法时,最终会出现太多的Either
相互包装,例如Either[Either[Either[...]]]
. 这需要定义太多的编码器。
所以我实际上是这样做的:
如果我这样做,由于类型的不同,它需要有许多不同的编码器Either
:
scala - 使用 kryo 期望在 Scala Spark 中编码 Guava 缓存
我正在使用 GuavaCache (带有scalacache 包装器),我想在一些火花管道中存储和使用整个缓存对象。
我使用 kryo 编码器进行编码SomeService
,这是一个包含缓存的案例类。运行代码时,抛出以下异常:
Guava 缓存是可序列化的,因此我认为 Kryo 编码器应该能够对其进行编码。LocalManualCache类也被定义为Serializable
- 当我直接使用番石榴缓存时同样的错误
- 我不想使用
asMap
,因为每条记录的 TTL 都会丢失。
我可以以某种方式手动编码和解码 gauva 缓存对象吗?还是不可能使用自定义缓存?
java - 用于不可变数据类型的 spark sql 编码器
在编写 java 代码时,我通常使用不可变值类型。有时它是通过库(Immutables、AutoValue、Lombok),但大多只是普通的 java 类:
- 所有
final
领域 - 以所有字段为参数的构造函数
(鉴于当前的 spark 支持,此问题适用于 java 11 及更低版本)。
在 Spark Sql 中,数据类型需要一个Encoder
. 使用现成的编码器Encoder.bean(MyType.class)
,如使用这种不可变的数据类型会导致“非法反射访问操作”。
我很好奇 spark sql (dataset) 方法是什么。显然我可以放松这一点,让它成为一个可变的 pojo。
更新
查看Encoders.bean
它的代码确实必须是一个经典的、可变的 POJO。反射代码寻找合适的设置器。此外(并且记录在案)唯一支持的集合类型是array
,list
和map
(not set
)。