0

使用 Axon 4.3.5 在我的 Saga 中从跟踪模式切换到订阅模式时,我看到了意外的行为

看来,在订阅模式下,当两个线程同时到达两个 @StarSaga 方法时,会为相同的关联键值创建两个 saga。我错过了什么吗?

我有这个来重现它:

@Saga
@ProcessingGroup("Saga")
public class RaceSaga {

    @Inject
    protected transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Exec exec) {
        commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(), exec.getDescription()));
    }

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Risk risk) {
        commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(), risk.getResult()));
    }
}

@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {

    @Autowired
    private EventGateway eventGateway;
    @Autowired
    private SagaStore sagaStore;

    @Test
    void sagaRace() {
        var execId = UUID.randomUUID();

        CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(), "desc")));
        CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(), "OK")));

        var association = new AssociationValue("executionId", execId.toString());
        await().during(5, SECONDS)
                .untilAsserted(() -> assertThat(sagaStore.findSagas(RaceSaga.class, association))
                        .hasSize(1));
    }
}

使用跟踪模式时测试通过,但订阅失败。(yml 配置)

4

1 回答 1

0

老实说,考虑到测试设置,这是预期的行为,但需要一些解释。

知道以下是订阅 (SEP) 和跟踪事件处理器 (TEP) 之间的主要区别:

  • SubscribingEventProcessor- 在发布事件的线程中调用EventBus,类似于推送机制。
  • TrackingEventProcessor- 在从 中检索事件的单独线程中调用EventStore,类似于拉机制。

这确保了无论采用何种方法并发发布事件,TEP 都将确保在这种情况下的事件处理顺序。

当涉及到 SEP 时,情况略有不同,我们需要稍微深入研究一下实现。您可以假设两个或更多事件的发布并不过分奇怪。考虑到域内的正确要求,很多聚合实现都可以做到这一点。该框架有一种方法可以将多个事件的这些事务分组在一个批次中。为此,它使用UnitOfWork. 例如,如果您要输入聚合的命令处理函数,则可以确保 aUnitOfWork处于活动状态以协调生命周期。其中一项任务是配对一批事件以进行发布。

但是,在您的测试用例中,您正在EventGateway直接使用。本质上完全没问题,但是测试是在没有开始UnitOfWork协调这两个事件以按顺序发生的情况下设置的。深入研究代码以查看发布到 SEP 的工作原理,您将AbstractEventProcessor在此阶段着陆。进行验证以检查 a在调用UnitOfWork时是否处于活动状态。EventProcessor#publish(List<EventMessage>)如果是这样,则将事件添加到UnitOfWork.

但是,当没有UnitOfWork(UoW)处于活动状态时,将立即调用处理程序。

因此,在使用 a 时TrackingEventProcessor,框架会有意识地启动一个 UoW 来将事件批处理并按顺序处理。使用 a 时SubscribingEventProcessor,这项工作留给用户,假设用户通常会通过 [命令处理 -> 事件发布 -> 事件处理] 的常规流程,这将确保 UoW 处于活动状态。由于在您的集成测试中不是这种情况,因此两个发布操作都将立即调用RaceSaga's SagaManager,由于并发性而创建两个实例。

请注意,建议将 TEP 用于此类过程。为 Saga 使用 SEP 可能意味着您将在应用程序(错误)关闭期间丢失一些事件。由于 SEP 是一种推送机制,因此无法从这些“丢失”(从事件处理器的角度)事件中恢复。TEP 将解决此问题,因为它会自行处理事件并跟踪流程。

相信这一点可以为您澄清@matpiera。

于 2020-10-07T07:59:46.123 回答