我有一个问题,我必须在 Spring-Boot(没有 Axon 服务器)中使用 Atomikos 和 Axon 框架。我正在使用 Oracle DB,并且我正在使用多个线程 (10) 来发送大量命令,在此之前我正在为自己配置一个 JtaTransactionManager,但在某些线程中我得到了这种异常:javax.transaction.xa。 XAException,引发 -6 或 -4 或 -3 或 ORA-02056:2PC:k2lcom:错误的两阶段命令号 rdonly 来自 coord:。当我调试时,我看到 CommandGateWay 也在使用 JtaTransactionManager。这样对吗?这是什么时候开始交易?我的 JtaTransactionManager 和 Axon 有可能发生冲突吗?有人遇到过这种例外吗?
示例代码:
@Service
public class CreateEntitiesServiceImpl extends FutureCompleter implements CreateEntitiesService {
private static Logger logger = LoggerHelper.getDeveloperLogger(CreateEntitiesServiceImpl.class);
private final CommandGateway commandGateway;
private final ExecutionUtil executionUtil;
private final MyEntityRepository myEntityRepository;
public CreateEntitiesServiceImpl(CommandGateway commandGateway, ExecutionUtil executionUtil, MyEntityRepository myEntityRepository) {
this.commandGateway = commandGateway;
this.executionUtil = executionUtil;
this.myEntityRepository = myEntityRepository;
}
@Override
public void process(Message message) {
logger.info("Entity addition started!");
generateEntities();
logger.info("Entity addition finished!");
}
private void generateEntities() {
ExecutorService executorService = executionUtil.createExecutor(10, "createEntities");
List<Integer> list = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
CreateEntitiesService proxy = applicationContext.getBean(CreateEntitiesServiceImpl.class);
List<CompletableFuture<Void>> processingFutures = list.stream().map(
e -> CompletableFuture.runAsync(proxy::createEntity, executorService).whenComplete((x, y) -> executorService.shutdown()))
.collect(Collectors.toList());
processingFutures.stream().map(this::getVoidFuture).collect(Collectors.toList());
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void createEntity() {
try {
MyEntity myEntity = new MyEntity();
myEntity.setEntityStringProperty("string");
myEntity.setEntityTimestampProperty(LocalDateTime.now());
MyEntity savedEntity = myEntityRepository.save(myEntity);
CreateAggregateCommand command = new CreateAggregateCommand(savedEntity.getEntityId(), savedEntity.getEntityStringProperty(),
savedEntity.getEntityTimestampProperty());
commandGateway.send(command);
} catch (Exception e) {
throw new CreateEntitiesException(e.getMessage(), e);
}
}
}
谢谢