0

我有一个问题,我必须在 Spring-Boot(没有 Axon 服务器)中使用 Atomikos 和 Axon 框架。我正在使用 Oracle DB,并且我正在使用多个线程 (10) 来发送大量命令,在此之前我正在为自己配置一个 JtaTransactionManager,但在某些线程中我得到了这种异常:javax.transaction.xa。 XAException,引发 -6 或 -4 或 -3 或 ORA-02056:2PC:k2lcom:错误的两阶段命令号 rdonly 来自 coord:。当我调试时,我看到 CommandGateWay 也在使用 JtaTransactionManager。这样对吗?这是什么时候开始交易?我的 JtaTransactionManager 和 Axon 有可能发生冲突吗?有人遇到过这种例外吗?

示例代码:

@Service
public class CreateEntitiesServiceImpl extends FutureCompleter implements CreateEntitiesService {

    private static Logger logger = LoggerHelper.getDeveloperLogger(CreateEntitiesServiceImpl.class);
    private final CommandGateway commandGateway;
    private final ExecutionUtil executionUtil;
    private final MyEntityRepository myEntityRepository;

    public CreateEntitiesServiceImpl(CommandGateway commandGateway, ExecutionUtil executionUtil, MyEntityRepository myEntityRepository) {
        this.commandGateway = commandGateway;
        this.executionUtil = executionUtil;
        this.myEntityRepository = myEntityRepository;
    }

    @Override
    public void process(Message message) {
        logger.info("Entity addition started!");
        generateEntities();
        logger.info("Entity addition finished!");
    }

    private void generateEntities() {
        ExecutorService executorService = executionUtil.createExecutor(10, "createEntities");

        List<Integer> list = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());

        CreateEntitiesService proxy = applicationContext.getBean(CreateEntitiesServiceImpl.class);

        List<CompletableFuture<Void>> processingFutures = list.stream().map(
                e -> CompletableFuture.runAsync(proxy::createEntity, executorService).whenComplete((x, y) -> executorService.shutdown()))
                .collect(Collectors.toList());

        processingFutures.stream().map(this::getVoidFuture).collect(Collectors.toList());
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void createEntity() {
        try {
            MyEntity myEntity = new MyEntity();
            myEntity.setEntityStringProperty("string");
            myEntity.setEntityTimestampProperty(LocalDateTime.now());

            MyEntity savedEntity = myEntityRepository.save(myEntity);
            CreateAggregateCommand command = new CreateAggregateCommand(savedEntity.getEntityId(), savedEntity.getEntityStringProperty(),
                    savedEntity.getEntityTimestampProperty());
            commandGateway.send(command);
        } catch (Exception e) {
            throw new CreateEntitiesException(e.getMessage(), e);
        }
    }
}

谢谢

4

1 回答 1

0

我不完全理解你要说实话。

标题将 Axon 与 Atomikos 结合在一起,而我认为这一点永远不会在描述中出现。

CommandGateway您在问题描述中要问的是使用事务管理器是否正确。

不过,在这件事上,我可以很清楚:是的。发送命令很可能最终会出现在聚合实例中。由于您希望保护此实例中的一致性边界确保已发布的事件得到存储,因此在处理命令后立即启动事务是明智的。

顺便说一句,小注意:它是CommandBus使用TransactionManager. 从概念上讲,到目前为止,这并没有改变我的描述。

最后,我不完全确定这是否会对您有所帮助,因为您的问题对我来说并不完全清楚。我希望重写你的帖子能更清楚地说明你真正追求的是什么。

于 2020-02-26T20:06:56.100 回答