问题描述
我们有一个 Hadoop 集群,我们在其上存储使用Kryo(序列化框架)序列化为字节的数据。我们用来执行此操作的 Kryo 版本已从官方版本 2.21 中分叉出来,以将我们自己的补丁应用到我们在使用 Kryo 时遇到的问题。当前的 Kryo 版本 2.22 也修复了这些问题,但有不同的解决方案。因此,我们不能只更改我们使用的 Kryo 版本,因为这意味着我们将无法再读取已经存储在 Hadoop 集群上的数据。为了解决这个问题,我们想运行一个 Hadoop 作业
- 读取存储的数据
- 反序列化使用旧版本 Kryo 存储的数据
- 用新版本的 Kryo 序列化恢复的对象
- 将新的序列化表示写回我们的数据存储
问题在于,在一个 Java 程序中(更准确地说,在 Hadoop 作业的映射器类中)使用同一类的两个不同版本并非易事。
简而言之问题
如何在一个 Hadoop 作业中使用同一序列化框架的两个不同版本反序列化和序列化对象?
相关事实概述
- 我们将数据存储在 Hadoop CDH4 集群上,使用 Kryo 版本 2.21.2-ourpatchbranch 进行序列化
- 我们希望使用与我们的版本不兼容的 Kryo 版本 2.22 对数据进行序列化
- 我们使用 Apache Maven 构建我们的 Hadoop 作业 JAR
可能(和不可能)的方法
(1) 重命名包
我们想到的第一种方法是使用Maven Shade 插件的重定位功能重命名我们自己的 Kryo 分支中的包,并使用不同的工件 ID 发布它,以便我们可以在转换作业项目中依赖这两个工件。然后,我们将实例化一个旧版本和新版本的 Kryo 对象,并使用旧版本进行反序列化,使用新版本再次序列化该对象。
问题
我们没有在 Hadoop 作业中明确使用 Kryo,而是通过我们自己的库的多层访问它。对于这些库中的每一个,都需要
- 重命名涉及的包和
- 创建具有不同组或工件 ID 的版本
为了让事情变得更加混乱,我们还使用了其他 3rd 方库提供的 Kryo 序列化程序,我们必须为此做同样的事情。
(2) 使用多个类加载器
我们提出的第二种方法是在包含转换作业的 Maven 项目中完全不依赖 Kryo,而是从存储在 Hadoop 的分布式缓存中的每个版本的 JAR 中加载所需的类。序列化一个对象看起来像这样:
public byte[] serialize(Object foo, JarClassLoader cl) {
final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
Object k = kryoClass.getConstructor().newInstance();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");
Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
writeObject.invoke(k, output, foo);
outputClass.getMethod("close").invoke(output);
baos.close();
byte[] bytes = baos.toByteArray();
return bytes;
}
问题
尽管这种方法可以用于实例化未配置的 Kryo 对象并序列化/恢复某些对象,但我们使用了更复杂的 Kryo 配置。这包括几个自定义序列化程序、注册的类 id 等等。例如,我们无法找到一种方法来为类设置自定义序列化程序而不会得到 NoClassDefFoundError - 以下代码不起作用:
Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError
最后一行抛出一个
java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer
因为URISerializer
该类引用了 Kryo 的Serializer
类并尝试使用它自己的类加载器(即 System 类加载器)来加载它,它不知道Serializer
该类。
(3) 使用中间序列化
目前最有前途的方法似乎是使用独立的中间序列化,例如使用Gson或类似方法的 JSON,然后运行两个单独的作业:
- kryo:2.21.2-ourpatchbranch 在我们的常规存储中 -> JSON 在临时存储中
- 临时存储中的 JSON -> 我们常规存储中的 kryo:2-22
问题
这个解决方案的最大问题是它大约使处理的数据的空间消耗增加了一倍。此外,我们需要另一种序列化方法,它对我们所有的数据都没有问题,我们需要首先对其进行调查。