6

我需要创建一个以字符串键 HashMap 作为值的状态存储。我尝试了以下两种方法。

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
          .withKeys(Serdes.String())
          .withValues(HashMap.class)
          .persistent()
          .build();

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")
          .withKeys(Serdes.String())
          .withValues(Serdes.serdeFrom(h.getClass()))
          .persistent()
          .build();

代码编译正常,没有任何错误,但出现运行时错误

io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer

有人可以建议我创建国营商店的正确方法是什么?

4

1 回答 1

6

如果要创建状态存储,则需要为要使用的类型提供序列化器和反序列化器类。在 Kafka Stream 中,有一个称为Serde的抽象,它将序列化器和反序列化器包装在一个类中。

如果你使用.withValues(Class<K> keyClass)它必须持有

@param keyClass 键的类,它必须是 Kafka 内置 serdes 的类型之一

因为没有内置的SerdesHashMap你需要先实现一个(可能称为HashMapSerde)并将这个类赋予方法.withValues(Serde<K> keySerde)。此外,您还必须实现实际的序列化器和反序列化器HashMap。如果您知道 HashMap 的泛型类型,则应该指定它们(这使得序列化器和反序列化器的实现更加简单。

像这样的东西(只是一个草图;省略了泛型):

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;

public class HashMapSerde implements Serde<HashMap> {

    void configure(Map<String, ?> configs, boolean isKey) {
        /* put your code here */
    }

    void close() {
        /* put your code here */
    }

    Serializer<HashMap> serializer() {
        return new Serializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public byte[] serialize(String topic, T data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }

    Deserializer<HashMap> deserializer() {
        return new Deserializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public T deserialize(String topic, byte[] data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }
}

如果您想查看如何实现(反)序列化程序的示例,请Serde查看https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/常见/序列化https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

于 2016-08-29T10:44:53.570 回答