1

我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:

ConsumerSettings<byte[], ETLProcessMessage> consumerSettings = ConsumerSettings
                .create(actorSystem, new ByteArrayDeserializer(), new KafkaJacksonSerializer<>(ETLProcessMessage.class))
                .withBootstrapServers(kafkaServers)
                .withGroupId(consumerGroupId)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
                .mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> {
                    handlePartitionedRequest(msg.record().value());
                    return Done.getInstance();
                }))
                .runWith(Sink.ignore(), materializer);

但是上面的代码在 runwith() 处显示编译器错误: 在此处输入图像描述

这是 KafkaJacksonSerializer 的代码:

import com.adaequare.mapro.common.exception.AppException;
import com.adaequare.mapro.config.jackson.PostConstructDeserializer;
import com.adaequare.mapro.model.transformer.JSONTransformer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.io.CharStreams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;

public class KafkaJacksonSerializer<T> implements Serializer<T>, Deserializer<T>{    
    private ObjectReader objectReader;
    private ObjectWriter objectWriter;
    private ObjectMapper objectMapper;

    public KafkaJacksonSerializer(){   
    }

    public KafkaJacksonSerializer(Class<T> persistentClass) {
        objectMapper = new ObjectMapper();

        SimpleModule module = new SimpleModule();
        module.setDeserializerModifier(new BeanDeserializerModifier() {
            @Override
            public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config,
                                                          BeanDescription beanDesc, final JsonDeserializer<?> deserializer) {
                return new PostConstructDeserializer(deserializer);
            }
        });
        objectMapper.registerModule(module);

        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
                .withFieldVisibility(JsonAutoDetect.Visibility.ANY)
                .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withSetterVisibility(JsonAutoDetect.Visibility.NONE)
                .withCreatorVisibility(JsonAutoDetect.Visibility.NONE));

        objectReader = objectMapper.readerFor(persistentClass);
        objectWriter = objectMapper.writer();
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        InputStream stream = new ByteArrayInputStream(data);
        if(stream == null){
            return null;
        }

        try {
            String json = CharStreams.toString(new InputStreamReader(stream));
            return objectReader.readValue(json);
        } catch (IOException e) {
            throw AppException.forException("Error while unmarshalling AssetData JSON: "+e.getMessage(), e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if(data == null){
            return null;
        }

        try {
            return objectWriter.writeValueAsBytes(data);
        } catch (IOException e) {
            throw AppException.forException("Error while marshalling JSON: "+e.getMessage(), e);
        }
    }

    @Override
    public void close() {
    }
}

我不确定到底是什么问题。但是下面的代码没有显示任何错误:

ConsumerSettings newconsumerSettings = ConsumerSettings
                .create(actorSystem, new ByteArrayDeserializer(), new StringDeserializer())
                .withBootstrapServers(kafkaServers)
                .withGroupId(consumerGroupId)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer.committableSource(newconsumerSettings, Subscriptions.topics("topic2"))
                .mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> Done.getInstance()))
                .runWith(Sink.ignore(), materializer);

有人可以帮我确定这里出了什么问题吗?

4

1 回答 1

1

添加的依赖项之间存在 akka 版本不匹配。一旦我将它们更正为相同,我就不会再看到编译错误。

以下是我使用的依赖项:

compile 'com.typesafe.akka:akka-actor_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster-tools_2.12:2.5.4'
compile 'com.typesafe.akka:akka-slf4j_2.12:2.5.4'

这是我为响应式 kafka 新添加的内容:

compile 'com.typesafe.akka:akka-stream-kafka_2.12:0.21'

将 akka(actor/cluster 相关)依赖项升级到 2.5.9 后,编译错误消失了。

于 2018-06-11T08:54:13.783 回答