2

我正在努力配置 Spring Integration DSL 转换器使用的“自定义”ObjectMapper。我收到一个java.time.Instant我想解析为对象属性的 json 表示。即:
{“type”:“TEST”,“source”:“TEST”,“timestamp”:{“epochSecond”:1454503381,“nano”:335000000}}

该消息是一条 kafka 消息,它提出了一个问题:我应该编写一个自定义序列化程序来实现 Kafka 编码器/解码器,以便能够将 kafka 消息转换为正确的对象,还是 spring-integration 必须自动执行此操作?

固件/依赖项和版本:
Spring Boot - 1.3.2.RELEASE
Spring Integration Java Dsl - 1.1.1.RELEASE
FasterXml Jackson - 2.6.5

我已按照 Jackson 文档将此 Java 配置添加到项目中: https ://github.com/FasterXML/jackson-datatype-jsr310

@Configuration
public class IntegrationConfiguration {

    @Bean
    public JsonObjectMapper<JsonNode, JsonParser> jsonObjectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        return new Jackson2JsonObjectMapper(mapper);
    }
}

以及以下 Jackson JSR-310 人工制品:

<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
    <version>2.6.5</version>
</dependency>

根据 Spring 博客上的这篇文章,我什至不必注册新的 Java8 时间模块。 https://spring.io/blog/2014/12/02/latest-jackson-integration-improvements-in-spring#jackson-modules

这是我得到的例外:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class java.time.Instant]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)
 at [Source: {"type":"TEST","source":"TEST","timestamp":{"epochSecond":1454503381,"nano":335000000}}; line: 1, column: 71] (through reference chain: my.app.MyDto["timestamp"])
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1106)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:296)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520)
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:95)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:258)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:125)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2764)
    at org.springframework.integration.support.json.Jackson2JsonObjectMapper.fromJson(Jackson2JsonObjectMapper.java:75)
    at org.springframework.integration.support.json.Jackson2JsonObjectMapper.fromJson(Jackson2JsonObjectMapper.java:44)
    at org.springframework.integration.support.json.AbstractJacksonJsonObjectMapper.fromJson(AbstractJacksonJsonObjectMapper.java:56)
    at org.springframework.integration.json.JsonToObjectTransformer.doTransform(JsonToObjectTransformer.java:78)
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33)
    ... 74 more

解决方案:
问题是我希望 Spring 会检测到 jackson-datatype-jsr310 原型并注册 JavaTimeModule,但事实并非如此,这完全没问题。有两种方法可以解决这个问题:
1. 如果我们按原样使用 Spring Boot 和 Spring Integration,则可以接受的答案。
2. 如果使用 Spring Integration Dsl,只需将 IntegrationConfiguration 类与 jsonObjectMapper() bean 一起保留并像这样使用它:

@Autowired
private JsonObjectMapper jsonObjectMapper;    

return IntegrationFlows
        .from(inboundChannel())
        .transform(Transformers.fromJson(myDto.class, jsonObjectMapper))
        ...
4

2 回答 2

2

强制 Spring Integration 使用它与 Spring Boot 无关。

你只需要配置JsonToObjectTransformer你的jsonObjetMapper()

@Bean
@Transformer(inputChannel="input", outputChannel="output")
JsonToObjectTransformer jsonToObjectTransformer() {
    return new JsonToObjectTransformer(jsonObjectMapper());
}

虽然没有理由注册JsonObjectMapper为 bean。

于 2016-02-03T16:39:12.720 回答
0

您是否为通道适配器定义了编码器?

您应该始终为您使用的任何适配器使用编码器,入站通道适配器或消息驱动通道适配器。

在您的情况下, StringEncoder 应该可以解决问题。

<bean id="myEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/>
于 2016-02-03T15:10:03.257 回答