16

问题描述

我们有一个 Hadoop 集群,我们在其上存储使用Kryo(序列化框架)序列化为字节的数据。我们用来执行此操作的 Kryo 版本已从官方版本 2.21 中分叉出来,以将我们自己的补丁应用到我们在使用 Kryo 时遇到的问题。当前的 Kryo 版本 2.22 也修复了这些问题,但有不同的解决方案。因此,我们不能只更改我们使用的 Kryo 版本,因为这意味着我们将无法再读取已经存储在 Hadoop 集群上的数据。为了解决这个问题,我们想运行一个 Hadoop 作业

  1. 读取存储的数据
  2. 反序列化使用旧版本 Kryo 存储的数据
  3. 用新版本的 Kryo 序列化恢复的对象
  4. 将新的序列化表示写回我们的数据存储

问题在于,在一个 Java 程序中(更准确地说,在 Hadoop 作业的映射器类中)使用同一类的两个不同版本并非易事。

简而言之问题

如何在一个 Hadoop 作业中使用同一序列化框架的两个不同版本反序列化和序列化对象?

相关事实概述

  • 我们将数据存储在 Hadoop CDH4 集群上,使用 Kryo 版本 2.21.2-ourpatchbranch 进行序列化
  • 我们希望使用与我们的版本不兼容的 Kryo 版本 2.22 对数据进行序列化
  • 我们使用 Apache Maven 构建我们的 Hadoop 作业 JAR

可能(和不可能)的方法

(1) 重命名包

我们想到的第一种方法是使用Maven Shade 插件的重定位功能重命名我们自己的 Kryo 分支中的包,并使用不同的工件 ID 发布它,以便我们可以在转换作业项目中依赖这两个工件。然后,我们将实例化一个旧版本和新版本的 Kryo 对象,并使用旧版本进行反序列化,使用新版本再次序列化该对象。

问题
我们没有在 Hadoop 作业中明确使用 Kryo,而是通过我们自己的库的多层访问它。对于这些库中的每一个,都需要

  1. 重命名涉及的包和
  2. 创建具有不同组或工件 ID 的版本

为了让事情变得更加混乱,我们还使用了其他 3rd 方库提供的 Kryo 序列化程序,我们必须为此做同样的事情。


(2) 使用多个类加载器

我们提出的第二种方法是在包含转换作业的 Maven 项目中完全不依赖 Kryo,而是从存储在 Hadoop 的分布式缓存中的每个版本的 JAR 中加载所需的类。序列化一个对象看起来像这样:

public byte[] serialize(Object foo, JarClassLoader cl) {
    final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
    Object k = kryoClass.getConstructor().newInstance();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");

    Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
    Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
    writeObject.invoke(k, output, foo);
    outputClass.getMethod("close").invoke(output);
    baos.close();
    byte[] bytes = baos.toByteArray();
    return bytes;
}

问题
尽管这种方法可以用于实例化未配置的 Kryo 对象并序列化/恢复某些对象,但我们使用了更复杂的 Kryo 配置。这包括几个自定义序列化程序、注册的类 id 等等。例如,我们无法找到一种方法来为类设置自定义序列化程序而不会得到 NoClassDefFoundError - 以下代码不起作用:

Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError

最后一行抛出一个

java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer

因为URISerializer该类引用了 Kryo 的Serializer类并尝试使用它自己的类加载器(即 System 类加载器)来加载它,它不知道Serializer该类。


(3) 使用中间序列化

目前最有前途的方法似乎是使用独立的中间序列化,例如使用Gson或类似方法的 JSON,然后运行两个单独的作业:

  1. kryo:2.21.2-ourpatchbranch 在我们的常规存储中 -> JSON 在临时存储中
  2. 临时存储中的 JSON -> 我们常规存储中的 kryo:2-22

问题
这个解决方案的最大问题是它大约使处理的数据的空间消耗增加了一倍。此外,我们需要另一种序列化方法,它对我们所有的数据都没有问题,我们需要首先对其进行调查。

4

3 回答 3

7

我会使用多个类加载器方法。

(包重命名也可以。它看起来确实很丑,但这是一次性的hack,所以美观和正确性可以退居二线。中间序列化似乎有风险 - 你使用 Kryo 是有原因的,这个原因将被否定通过使用不同的中间形式)。

整体设计将是:

child classloaders:      Old Kryo     New Kryo   <-- both with simple wrappers
                                \       /
                                 \     /
                                  \   /
                                   \ /
                                    |
default classloader:    domain model; controller for the re-serialization
  1. 在默认类加载器中加载域对象类
  2. 使用修改后的 Kryo 版本和包装器代码加载 Jar。包装器有一个带有一个参数的静态“main”方法:要反序列化的文件的名称。通过默认类加载器的反射调用 main 方法:

        Class deserializer = deserializerClassLoader.loadClass("com.example.deserializer.Main");
        Method mainIn = deserializer.getMethod("main", String.class);
        Object graph = mainIn.invoke(null, "/path/to/input/file");
    
    1. 这种方法:
      1. 将文件反序列化为一个对象图
      2. 将对象放入共享空间。ThreadLocal是一种简单的方法,或者将其返回给包装脚本。
  3. 当调用返回时,使用带有简单包装器的新序列化框架加载第二个 Jar。包装器有一个静态的“main”方法和一个参数来传递要序列化的文件名。通过默认类加载器的反射调用 main 方法:

        Class serializer = deserializerClassLoader.loadClass("com.example.serializer.Main");
        Method mainOut = deserializer.getMethod("main", Object.class, String.class);
        mainOut.invoke(null, graph, "/path/to/output/file");
    
    1. 这种方法
      1. 从 ThreadLocal 中检索对象
      2. 序列化对象并将其写入文件

注意事项

在代码片段中,为每个对象序列化和反序列化创建一个类加载器。您可能只想加载一次类加载器,发现主要方法并遍历文件,例如:

for (String file: files) {
    Object graph = mainIn.invoke(null, file + ".in");
    mainOut.invoke(null, graph, file + ".out");
}

域对象是否对任何Kryo 类有任何引用?如果是这样,你有困难:

  1. 如果引用只是一个类引用,例如调用一个方法,那么第一次使用该类会将两个 Kryo 版本中的一个加载到默认的类加载器中。这可能会导致问题,因为序列化或反序列化可能由错误版本的 Kryo 执行
  2. 如果引用用于实例化任何 Kryo 对象并将引用存储在域模型(类或实例成员)中,那么 Kryo 实际上将在模型中序列化其自身的一部分。对于这种方法,这可能会破坏交易。

无论哪种情况,您的第一个方法应该是检查这些引用并消除它们。确保您已完成此操作的一种方法是确保默认类加载器无法访问任何Kryo 版本。如果域对象以任何方式引用 Kryo,则引用将失败(如果直接引用该类,则会出现 ClassNotFoundError,如果使用反射,则会出现 ClassNotFoundException)。

于 2013-04-24T00:36:51.103 回答
1

对于 2,您可以创建两个 jar 文件,其中包含序列化程序以及序列化程序新旧版本的所有依赖项,如下所示。然后创建一个 map reduce 作业,将每个版本的代码加载到单独的类加载器中,并在中间添加一些胶水代码,用旧代码反序列化,然后用新代码序列化。

您必须小心,您的域对象与胶水代码加载到同一个类加载器中,并且序列化/反序列化的代码依赖于与胶水代码相同的类加载器,以便它们都看到相同的域对象类。

于 2013-04-23T05:08:24.457 回答
1

我不假思索地想到的最简单的方法是使用额外的 Java 应用程序为您进行转换。因此,您将二进制数据发送到辅助 Java 应用程序(简单的本地套接字可以很好地完成任务),因此您不必摆弄类加载器或包。

唯一需要考虑的是中间表示。您可能想要使用另一种序列化机制,或者如果时间没有问题,您可能想要使用 Java 的内部序列化。

使用第二个 Java 应用程序使您免于处理临时存储并在内存中执行所有操作。

一旦你拥有了这些套接字 + 第二个应用程序代码,你就会发现很多情况都可以派上用场。

也可以使用 jGroups 构建本地集群,毕竟使用套接字可以省去麻烦。jGroups 是我所知道的最简单的通信 API。只需形成一个逻辑通道并检查谁加入。最好它甚至可以在同一个 JVM 中工作,这使得测试变得容易,如果远程完成,可以将不同的物理服务器绑定在一起,就像它在本地应用程序中工作一样。

另一个可变的替代方案是使用 ZeroMQ 及其 ipc(进程间通信)协议。

于 2015-07-04T19:16:58.017 回答