3

我正在使用带有弹簧云功能的弹簧云流兔子活页夹并定义监听器,例如:

public Function<Flux<SomeObject>, Flux<OtherObject>> foo() {
//some code
}

我还将失败的消息重新路由到 DLQ。问题是发生致命错误org.springframework.messaging.converter.MessageConversionException时。它不会像https://docs.spring.io/spring-amqp/reference/html/#exception-handlingConditionalRejectingErrorHandler中提到的那样得到处理,并且永远循环。

有没有办法让这个工作ConditionalRejectingErrorHandler

现在我通过@ServiceActivator(inputChannel = "errorChannel")自己使用和处理错误来解决问题。

依赖项:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-hateoas</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
</dependencies>
4

1 回答 1

1

我们长期以来一直在争论用于命令式函数的错误处理和其他特性,以及它们如何应用于(或者甚至可以应用于)响应式函数,并尝试了一些不同的方法,但不幸的是,这一切都归结为阻抗不匹配。

您描述的方法基于对单个消息的操作。这是命令式消息处理程序中的工作单元,例如Function<String, String>. 您使用响应式样式,并通过这样做将工作单元从流中的单个消息更改为整个流。

简而言之:

- Function<?, ?> - unit of work is Message
- Function<Flux<?>, Flux<?>> - unit of work is the entire stream

您还可以轻松地观察到它,因为响应式函数在应用程序的生命周期内仅调用一次,而命令式函数在每个到达的消息中调用一次。我这么说的原因是我们用于命令式消息处理程序(函数)的基于框架的方法不能应用于响应式而不引起副作用。并且通常反应式开发人员理解这一点,特别是考虑到反应式 API 的丰富性,特别是在错误处理方面

无论如何,我们都会相应地更新文档。

于 2020-02-27T14:57:45.847 回答