2

当我使用来自 kinesis 流的消息时。我得到一些带有标题等的垃圾字符

    @StreamListener(Processor.INPUT)
    public void receive(String message) {       
        System.out.println("Message recieved: "+message);
        throw new RuntimeException("Exception thrown");
    }

    @StreamListener("errorChannel")
    public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException {      

        //original paylaod 
        System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8"));
        System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream"));
    }

应用 yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: abcd
          destination: stream
          content-type: application/json
          errorChannelEnabled: true
          consumer:
            headerMode: raw

我在侦听器和 errorChannel 都得到了带有垃圾字符的输出

我正在尝试在 errorChannel 中提取原始消息。这是转换字节消息的正确方法吗?

Message recieved: ?contentType "application/json"{"aa":"cc"}
4

1 回答 1

3

AWS Kinesis 不提供任何标头实体。因此,为了利用 Spring Cloud Stream 中的这种功能,我们将标头嵌入到 Kinesis 记录的主体中。为此,默认headerMode情况embeddedHeaders下在 Kinesis Binder 中。并且为了生产者和消费者之间的对称性,这个选项不能改变。

EmbeddedHeadersChannelInterceptor该框架为目标通道提供了开箱即用的功能,@StreamListener并且嵌入的标头被提取并正确填充到要发送的消息中。

当我们处理 中的错误时errorChannel,我们确实有一个errorMessage.getOriginalMessage()as non-transformed - original。因此payload,该消息byte[]来自包含嵌入标题的记录正文。

如果您想正确解析它们。你应该使用实用程序:

EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);
于 2018-03-20T21:55:57.727 回答