2

我有一个配置了 Sleuth 的 Spring Cloud Stream (SCS) Kafka 生产应用程序以进行跟踪。我正在尝试发布一个spring-messaging 'GenericMessage' (即MessageHeaders + payload),需要通过以下方式使用:

  • SCS 消费者(应该继续自动添加到 MessageHeaders 的跟踪)
  • 应该跳过 MessageHeaders 并处理有效负载的非 SCS/Java 消费者。

使用 headerMode=embeddedHeaders(默认): MessageHeader 中的跟踪条目在消息发布之前由 EmbeddedHeaderUtils 附加到消息中。我的非 java 消费者无法处理这个问题,因为 EmbeddedHeaderUtils 不会序列化为纯 JSON,即

?\n invalid-json-headers { payload }

使用 headerMode=raw: 根本不发送 MessageHeaders,仅序列化有效负载。IE { payload }

我真的只想在创建时发布整个 GenericMessage,包括 Sleuth 添加的跟踪 + 跨度 ID,即:

{"headers": {"id": "x", "trace": "y", "span": "z"}, "payload": { ... }}

除了为 SCS 消费者发布到一个主题,而另一个仅使用有效负载发布之外,还有其他方法可以实现此目的吗?

4

1 回答 1

1

因此,将该消息作为payload出站消息发布:

return MessageBuilder.withPayload(message).build();

这样,您的消息将被序列化为 a payload,并且您可以在正确反序列化并转换payloadMessage.

于 2017-08-17T20:15:38.450 回答