1

我正在尝试将单个输入消息转换为多个消息。我有一个带有以下签名的方法:

@Incoming("CH_IN")
@Outgoing("CH_OUT")
Multi<Message<B>> process(Message<A> in) {

}

A是这样的:

class A {
    private List<String> ids

    // getters and setters
}

对于每一个我想创建一个id. 我怎么能这样做并处理消息的确认?ABin

这是我所拥有的简化,但我不确定这是否是正确的做法:

@Incoming("CH_IN")
@Outgoing("CH_OUT")
Multi<Message<B>> process(Message<A> in) {
    List<Message<B>> out = new ArrayList<>();

    // Code to iterate ids and create instances of Message<B>
    // In certain cases the out list will be empty

    return Multi.createFrom().iterable(out).on().completion(in::ack);
}

这是确认消息的正确方法吗?

4

1 回答 1

0

这是对我所拥有的东西的简化,但我不确定这是否是正确的做法

只要您在评论处理部分不需要任何阻塞调用,它就可以工作,但恕我直言,这不是理想的:

  • 从风格上讲,在同一方法中混合命令式和反应式代码不是很整洁
  • 从功能上讲,如果您需要(现在或将来),这种方法将不允许您在处理中调用任何阻塞代码。

相反,我会提倡类似:

return Multi.createFrom().item(in)
        .flatMap(m -> Uni.createFrom().item(m).repeat().atMost(5)) //Replace with your actual processing
        .on().completion(in::ack);

...它将整个方法保持为一个反应链,并让您能够在flatMap()需要时访问其他发布者(例如,如果您的处理需要 Web 调用、数据库调用或其他一些阻塞 IO。)

这是确认消息的正确方法吗?

只要您只希望 ack 在处理完成时发生,就可以。

唯一的缺点是,如果在处理的中途发生灾难性故障,可以想象,有些可以在in被确认之前处理。唯一其他明智的行为是on().subscribed()改用,这只会产生相反的问题(in将被确认,但部分或所有消息可能不会发布到您的传出频道。)哪个更可取取决于您的用例。

于 2020-07-01T16:11:31.813 回答