16

我有一个简单的类来使用来自 kafka 服务器的消息。大部分代码抄自org.apache.kafka.clients.consumer.KafkaConsumer.java的注释。

public class Demo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.144.10:29092");
        props.put("group.id", "test");
        props.put("session.timeout.ms", "1000");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "10000");
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
        consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST");
        boolean isRunning = true;
        while (isRunning) {
            Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
            process(records);
        }
        consumer.close();
    }

    private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) {
        Map<TopicPartition, Long> processedOffsets = new HashMap<>();
        for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
            List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
            for (int i = 0; i < recordsPerTopic.size(); i++) {
                ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
                // process record
                try {
                    processedOffsets.put(record.topicAndPartition(), record.offset());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        return processedOffsets;
    }
}

我正在使用“org.apache.kafka:kafka-clients:0.8.2.0”。它抛出异常

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
    at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400)
    at kafka.integration.Demo.main(Demo.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

我应该如何配置 key.deserializer?

4

5 回答 5

19

这开箱即用,无需实现您自己的序列化程序

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", "range");
于 2015-10-30T18:13:30.523 回答
3

对于键,请使用以下之一

字符串键

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

JSON 密钥

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Avro 键

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

字节数组键

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

同样,为您的值反序列化器使用以下之一:

字符串值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

JSON 值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Avro 价值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

字节数组值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

请注意,对于 Avro 反序列化器,您将需要以下依赖项:

<dependency> 
    <groupId>io.confluent</groupId> 
    <artifactId>kafka-avro-serializer</artifactId> 
    <version>${confluent.version}</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro</artifactId> 
    <version>${avro.version}</version> 
</dependency>
于 2020-04-21T10:24:56.803 回答
1

您需要设置属性:

props.put("serializer.class","my.own.serializer.StringSupport");
props.put("key.serializer.class","my.own.serializer.LongSupport");

在您的 main 方法中,以便将它们传递给生产者的构造函数。当然,您必须指定正确的编码器。序列化程序类将消息转换为字节数组,而 key.serializer 类将密钥对象转换为字节数组。通常,您还可以让他们逆转该过程。

于 2015-08-01T01:31:10.877 回答
1

您正在处理键和值参数的字节数组。所以需要字节序列化器和解串器。

您可以添加属性,

对于反序列化

props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");  

用于序列化

props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer");
于 2016-11-16T12:23:48.963 回答
0

确保传递反序列化类的字符串值,而不是类对象(这是我的错误)。

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
          Serdes.String().deserializer().getClass().getName());

当您忘记时,.getName()您将得到相同的异常,在这种情况下会产生误导。

于 2020-04-21T07:50:17.897 回答