1

我有一个具有以下内容类型配置的生产者

spring:
  cloud:
    stream:
      bindings:
        eventOut:
          destination: lab_csi
          content-type: application/json

在消费者方面,我使用弹簧集成(KinesisMessageDrivenChannelAdapter)将事件路由到不同的通道。当我在监听器类上收到消息时,如下所示:

@ServiceActivator(inputChannel = "channelA")
    void handleMessage(Message<?> msg) {
        objectMapper.readValue(msg.getPayload(), MyEvent.class);
    }

编组到 MyEvent 失败。在堆栈错误中,我可以看到内容类型是有效负载的一部分,有效负载仍未从 json 反序列化为 POJO。

我想知道如何在进行任何其他转换之前反序列化消息。我没有找到任何可以将 MessageConverter 设置为适配器的方法。

我感谢您的帮助。

谢谢

4

1 回答 1

2

听起来你的生产者是 Spring Cloud Stream,但消费者只是普通KinesisMessageDrivenChannelAdapter的。目前尚不清楚为什么不使用 Spring Cloud Stream 消费者,但无论如何......

SCSt 生产者将消息头与有效负载一起序列化到 Kinesis 记录正文中的问题。仅仅因为 AWS Kinesis 本身不支持标头。

如果您真的对消费者端的标头不感兴趣,您可以在生产者端禁用嵌入标头:

spring:
  cloud:
    stream:
      bindings:
        eventOut:
          destination: lab_csi
        producer:
        headerMode: none

KinesisMessageDrivenChannelAdapter否则,除非您EmbeddedHeaderUtils手动使用,否则您别无选择。

于 2018-06-18T19:23:24.040 回答