0

作为 Spring Reactor 的新手,我正在尝试使用 Spring 云流(使用 rabbitMQ)流式传输数据。在将消息发送到队列之前,我需要添加一些自定义标头。

我的 spring-cloud-stream 的配置是:

spring:
  cloud:
    stream:
      default:
        producer:
          errorChannelEnabled: true
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

      binders:
        rabbitInput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5672
                host: localhost

        rabbitOutput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5670
                host: localhost 

生产者参考:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {

    public static void main(String[] args) {
        SpringApplication.run(MessageProcessor.class, args);
    }

    @Bean
    Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
        return data -> data.map(d -> match(d, students));

    }
    private String match(String message, List<String> students){
        return Objects.isNull(message) || message.isBlank()
            ? message
            : String.valueOf(matchStudentName(message, students));
    }

    private Optional<String> matchStudentName(String message, List<String> students){
        return students.stream()
        .filter(name -> name.equals(message)).findFirst();
    }
    @Bean
    Function<Flux<String>, Flux<Message<String>>> addHeaders() {
        return data-> data.map(d-> MessageBuilder
            .withPayload( d )
            .setHeader("a", 1)
            .setHeader("b", "999")
            .build());
    }
}

标头已成功添加到消息中,但它在某处被覆盖并且没有传播给消费者。

有人可以分享他们对我们如何使用 Spring Cloud Stream 将自定义标头添加到消息的想法。

提前致谢!

4

1 回答 1

0

请升级到 Hoxton.SR2,它将带来 spring-cloud-stream 3.0.2.RELEASE。有一些更新,但简而言之,您正在生成的消息和其中的标题应该被保留。

旁注: 此外,由于增加了对多个输入/输出函数参数的支持,我们必须更新函数的绑定名称约定。您可以在此处阅读有关它的更多信息,但这对您意味着您的配置需要快速更新,因为默认情况inputoutput不再使用,因此您应该使用从函数名派生的名称

spring:
  cloud:
    stream:
      bindings:
        processMessageaddHeaders-in-0:
          binder: rabbitInput
          destination: inputDestination
        processMessageaddHeaders-out-0:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

. . . 或者您可以将派生的绑定名称映射到更具描述性的名称(例如inputoutput等)并改用该名称

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders
        bindings: 
          processMessageaddHeaders-in-0: input  
          processMessageaddHeaders-out-0: output


于 2020-02-19T08:07:24.550 回答