2

我刚刚开始学习 Spring Cloud Streams 和 Dataflow,我想知道对我来说重要的用例之一。我创建了示例处理器乘法器,它接收消息并将其重新发送 5 次以输出。

@EnableBinding(Processor.class)
public class MultiplierProcessor {
    @Autowired
    private Source source;

    private int repeats = 5;

    @Transactional
    @StreamListener(Processor.INPUT)
    public void handle(String payload) {
        for (int i = 0; i < repeats; i++) {
            if(i == 4) {
                throw new RuntimeException("EXCEPTION");
            }
            source.output().send(new GenericMessage<>(payload));
        }
    }
}

您可以看到,在第 5 次发送此处理器之前崩溃。为什么?因为它可以(程序抛出异常)。在这种情况下,我想在 Spring Cloud Stream 上练习故障预防。

我想要实现的是在 DLQ 中支持输入消息和之前发送的 4 条消息被还原并且不被下一个操作数使用(就像在正常的 JMS 事务中一样)。我已经尝试在我的处理器项目中定义以下属性,但没有成功。

spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.consumer.autoBindDlq=true

你能告诉我是否可能,还有我做错了什么?我会非常感谢一些例子。

4

1 回答 1

4

您的配置有几个问题:

  • .rabbit兔子特有的属性中缺少)
  • 您需要一个组名和持久订阅才能使用autoBindDlq
  • autoBindDlq不适用于输出端

必须对消费者进行事务处理,以便生产者发送在同一事务中执行。

我刚刚用 1.0.2.RELEASE 测试了这个:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true

它按预期工作。

编辑

实际上,不,发布的消息没有回滚。正在调查...

编辑2

好的; 它确实有效,但您不能使用republishToDlq- 因为启用该功能后,活页夹将失败的消息发布到 DLQ 并提交事务。

如果为 false,则向容器抛出异常,回滚事务,并且 RabbitMQ 将失败的消息移动到 DLQ。

但是请注意,默认情况下会启用重试(3 次尝试),因此,如果您的处理器在重试期间成功,您将在输出中获得重复项。

为此,您需要通过将最大尝试次数设置为 1 来禁用重试(并且不要使用republishToDlq)。

编辑3

好的,如果您想更好地控制错误的发布,当此 JIRA的修复应用于 Spring AMQP 时,这将起作用...

@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {

    public static void main(String[] args) {
        SpringApplication.run(So39018400Application.class, args);
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    public interface Errors {

        @Output("errors")
        MessageChannel errorChannel();

    }

    private static class Foo {

        @Autowired
        Source source;

        @Autowired
        Errors errors;

        @StreamListener(Processor.INPUT)
        public void handle (Message<byte[]> in) {
            try {
                source.output().send(new GenericMessage<>("foo"));
                source.output().send(new GenericMessage<>("foo"));
                throw new RuntimeException("foo");
            }
            catch (RuntimeException e) {
                errors.errorChannel().send(MessageBuilder.fromMessage(in)
                        .setHeader("foo", "bar") // add whatever you want, stack trace etc.
                        .build());
                throw e;
            }
        }

    }

}

具有属性:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false


spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1
于 2016-08-18T14:01:07.567 回答