作为 Spring Reactor 的新手,我正在尝试使用 Spring 云流(使用 rabbitMQ)流式传输数据。在将消息发送到队列之前,我需要添加一些自定义标头。
我的 spring-cloud-stream 的配置是:
spring:
cloud:
stream:
default:
producer:
errorChannelEnabled: true
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
binders:
rabbitInput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5672
host: localhost
rabbitOutput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5670
host: localhost
生产者参考:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {
public static void main(String[] args) {
SpringApplication.run(MessageProcessor.class, args);
}
@Bean
Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
return data -> data.map(d -> match(d, students));
}
private String match(String message, List<String> students){
return Objects.isNull(message) || message.isBlank()
? message
: String.valueOf(matchStudentName(message, students));
}
private Optional<String> matchStudentName(String message, List<String> students){
return students.stream()
.filter(name -> name.equals(message)).findFirst();
}
@Bean
Function<Flux<String>, Flux<Message<String>>> addHeaders() {
return data-> data.map(d-> MessageBuilder
.withPayload( d )
.setHeader("a", 1)
.setHeader("b", "999")
.build());
}
}
标头已成功添加到消息中,但它在某处被覆盖并且没有传播给消费者。
有人可以分享他们对我们如何使用 Spring Cloud Stream 将自定义标头添加到消息的想法。
提前致谢!