我正在尝试使用 Micronaut AMQP 执行请求/响应模式,如下图
制片人
@RabbitClient(ProductTopicConstants.FETE_BIRD_EXCHANGE)
@RabbitProperty(name = "replyTo", value = "amq.rabbitmq.reply-to")
public interface IProductProducer {
@Binding(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
Flowable<Product> findFreeText(String text);
}
听众
@RabbitListener
public class ProductListener {
@Queue(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product> findByFreeText(String text) {
LOG.info(String.format("Listener --> Listening value = %s", text));
return Flowable.fromPublisher(repository.getCollection("product", Product.class)
.find(new Document("$text",
new Document("$search", text)
.append("$caseSensitive", false)
.append("$diacriticSensitive", false)
)));
}}
对于它正在工作的请求,了解如何执行返回给控制器的响应以及如何使用 AMQP 协议和 micronaut 与消息代理异步实现此模式。