1

当我的 Spark 应用程序尝试序列化作为键字符串和值浮点数的映射的 protobuf 字段时,我在我的 Spark 应用程序中收到以下错误。在 spark 应用程序中使用了 Kryo 序列化。

Caused by: java.lang.NullPointerException
    at com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
    at java.util.AbstractList.add(AbstractList.java:108)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 71 more

有没有人遇到过这个问题?有没有办法解决它?

4

4 回答 4

0

当 Kryo 遇到一个它无法识别的类的对象时,它会退回到 Java 序列化。

但是可以设置Kryo为抛出异常而不是这样:

final Kryo kryo = new Kryo();
kryo.setRegistrationRequired(true);

我决定保留上面的注册,因为它有助于避免某些可能对性能产生负面影响的类的缓慢序列化。

为了处理 Protobuf 生成的类序列化,我使用了以下类:

package com.juarezr.serialization;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.protobuf.AbstractMessage;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class ProtobufSerializer<T extends AbstractMessage> extends Serializer<T> implements Serializable {
    
    static final long serialVersionUID = 1667386898559074449L;
    protected final Method parser;

    public ProtobufSerializer(final Class<T> protoMessageClass) {
        try {
            this.parser = protoMessageClass.getDeclaredMethod("parseFrom", byte[].class);
            this.parser.setAccessible(true);
        } catch (SecurityException | NoSuchMethodException ex) {
            throw new IllegalArgumentException(protoMessageClass.toString() + " doesn't have a protobuf parser", ex);
        }
    }

    @Override
    public void write(final Kryo kryo, final Output output, final T protobufMessage) {
        if (protobufMessage == null) {
            output.writeByte(Kryo.NULL);
            output.flush();
            return;
        }
        final byte[] bytes = protobufMessage.toByteArray();
        output.writeInt(bytes.length + 1, true);
        output.writeBytes(bytes);
        output.flush();
    }

    @SuppressWarnings({"unchecked", "JavaReflectionInvocation"})
    @Override
    public T read(final Kryo kryo, final Input input, final Class<T> protoMessageClass) {
        final int length = input.readInt(true);
        if (length == Kryo.NULL) {
            return null;
        }
        final Object bytesRead = input.readBytes(length - 1);
        try {
            final Object parsed = this.parser.invoke(protoMessageClass, bytesRead);
            return (T) parsed;
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException("Unable to deserialize protobuf for class: " + protoMessageClass.getName(), e);
        }
    }

    @Override
    public boolean getAcceptsNull() {
        return true;
    }

    @SuppressWarnings("unchecked")
    public static <M extends AbstractMessage> void registerMessagesFrom(final M rootMessage, final Kryo kryo) {

        final Class<M> messageClass = (Class<M>) rootMessage.getClass();
        final ProtobufSerializer<M> serializer = new ProtobufSerializer<>(messageClass);
        kryo.register(messageClass, serializer);

        final Class<?>[] nestedClasses = messageClass.getDeclaredClasses();
        for (final Class<?> innerClass : nestedClasses) {
            if ((AbstractMessage.class).isAssignableFrom(innerClass)) {
                final Class<M> typedClass = (Class<M>) innerClass;
                final ProtobufSerializer<M> serializer2 = new ProtobufSerializer<>(typedClass);
                kryo.register(typedClass, serializer2);
            }
        }
    }
}

您可以使用以下内容配置序列化:

// ...
final Kryo kryo = new Kryo();
kryo.setRegistrationRequired(true);

// Add a registration for each generated file and top level class ...
ProtobufSerializer.registerMessagesFrom(MyProtoEnclosingClass.MyProtoTopLevelClass.getDefaultInstance(), kryo);

// Add a registration for each other Java/Scala class you would need...
于 2020-12-09T15:33:41.247 回答
0

您必须向 kryo 注册 ProtobufSerializer 才能序列化 protobuf。

StreamExecutionEnvironment.getExecutionEnvironment()
                          .registerTypeWithKryoSerializer(YourProtobufClass.class, 
                                                          ProtobufSerializer.class); 

添加以下依赖项以访问 ProtobufSerializer 类。

<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.45</version>
</dependency>
于 2019-09-26T06:47:47.160 回答
0

您可以使用 kryo 注册 ProtobufSerializer 来序列化 protobuf

  • 第一:包括部门:
"de.javakaffee" % "kryo-serializers" % "0.43" // in sbt
  • 第二:扩展kryo序列化器
package com.my.serializer

class ExtendedKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[YourProtoMessageClass], new ProtobufSerializer())
  }
}
  • 第三:设置spark confExtendedKryoRegistrator
val conf = new SparkConf().setAppName("appName")

conf.set("spark.kryo.registrator", "com.my.serializer.ExtendedKryoRegistrator")

        
val spark = SparkSession.builder()
  .config(conf)
  .enableHiveSupport()
  .getOrCreate()
于 2021-03-10T02:40:09.267 回答
0

在配置中设置它,然后修复错误。

spark.serializer=org.apache.spark.serializer.JavaSerializer
于 2021-08-10T11:26:26.810 回答