0

我们正在使用带有 RabbitMQ 的 Spring Cloud Reactive Streams。

Spring Reactive Stream 似乎在将消息从队列中拉出后立即确认消息。因此,在消息处理期间发生的任何错误未处理异常都需要在应用程序中处理(这与可以抛出未处理异常并且消息将被拒绝的非反应性流不同,从而将其发送到死信队列)。

当消息在传输中时,我们应该如何处理应用程序中的突然关闭?

例如:

  • 应用程序从队列中拉出消息
  • 应用程序将消息标记为已确认
  • 应用程序开始消息处理
  • 应用程序在消息处理完成之前关闭

发生这种情况时,消息似乎完全丢失了,因为它不在队列中,但应用程序已停止。我们如何才能恢复这些消息?

4

1 回答 1

1

您需要使用手动确认并推迟确认,直到处理异步完成。为此,您需要使用整个消息:

    @Bean
    public Consumer<Flux<Message<String>>> async() {
        return inbound -> inbound

                ...

                .map(msg -> {
                    try {
                        msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                                .basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return msg.getPayload();
                })
                .subscribe(System.out::println);
    }
spring:
  cloud:
    stream:
      function.definition: async
      bindings:
        async-in-0:
          destination: testtock
          group: async
      rabbit:
        bindings:
          async-in-0:
            consumer:
              acknowledge-mode: MANUAL
              prefetch: 10

用于basicReject重新排队或发送到 DLQ。

于 2020-04-09T22:26:30.147 回答