我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:
ConsumerSettings<byte[], ETLProcessMessage> consumerSettings = ConsumerSettings
.create(actorSystem, new ByteArrayDeserializer(), new KafkaJacksonSerializer<>(ETLProcessMessage.class))
.withBootstrapServers(kafkaServers)
.withGroupId(consumerGroupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
.mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> {
handlePartitionedRequest(msg.record().value());
return Done.getInstance();
}))
.runWith(Sink.ignore(), materializer);
这是 KafkaJacksonSerializer 的代码:
import com.adaequare.mapro.common.exception.AppException;
import com.adaequare.mapro.config.jackson.PostConstructDeserializer;
import com.adaequare.mapro.model.transformer.JSONTransformer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.io.CharStreams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
public class KafkaJacksonSerializer<T> implements Serializer<T>, Deserializer<T>{
private ObjectReader objectReader;
private ObjectWriter objectWriter;
private ObjectMapper objectMapper;
public KafkaJacksonSerializer(){
}
public KafkaJacksonSerializer(Class<T> persistentClass) {
objectMapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.setDeserializerModifier(new BeanDeserializerModifier() {
@Override
public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config,
BeanDescription beanDesc, final JsonDeserializer<?> deserializer) {
return new PostConstructDeserializer(deserializer);
}
});
objectMapper.registerModule(module);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
objectReader = objectMapper.readerFor(persistentClass);
objectWriter = objectMapper.writer();
}
@Override
public T deserialize(String topic, byte[] data) {
InputStream stream = new ByteArrayInputStream(data);
if(stream == null){
return null;
}
try {
String json = CharStreams.toString(new InputStreamReader(stream));
return objectReader.readValue(json);
} catch (IOException e) {
throw AppException.forException("Error while unmarshalling AssetData JSON: "+e.getMessage(), e);
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, T data) {
if(data == null){
return null;
}
try {
return objectWriter.writeValueAsBytes(data);
} catch (IOException e) {
throw AppException.forException("Error while marshalling JSON: "+e.getMessage(), e);
}
}
@Override
public void close() {
}
}
我不确定到底是什么问题。但是下面的代码没有显示任何错误:
ConsumerSettings newconsumerSettings = ConsumerSettings
.create(actorSystem, new ByteArrayDeserializer(), new StringDeserializer())
.withBootstrapServers(kafkaServers)
.withGroupId(consumerGroupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer.committableSource(newconsumerSettings, Subscriptions.topics("topic2"))
.mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> Done.getInstance()))
.runWith(Sink.ignore(), materializer);
有人可以帮我确定这里出了什么问题吗?