我试图在同一个事务中执行 3 个插入,但是当其中一个插入失败时,我无法让事务回滚。
我是响应式世界的新手,这是我的第一个响应式应用程序。
这是数据库模型的简化:
EntityA 1---N EntityB
EntityA 1---N EntityC
我想在同一个事务中执行以下插入:
INSERT INTO A
INSERT INTO B --(failing query)
INSERT INTO C
但是,当第二次插入失败时,第一次插入不会回滚。
我有以下课程:
Processor
: 接收来自 kafka 的消息并通过 Service 触发插入Service
:使用 3 个 DAO 运行 3 个插入EntityADao
: 运行实体 A 的插入EntityBDao
: 运行实体 B 的插入EntityBDao
: 运行实体 C 的插入
@ApplicationScoped
public class Processor {
private final Service service;
public Processor(final Service service) {
this.service = service;
}
@Incoming("input-channel")
@Outgoing("output-channel")
public Uni<Message<RequestMessage>> process(final Message<RequestMessage> message) {
final RequestMessage rm = message.getPayload();
return service.saveEntities(rm)
.onFailure()
.recoverWithItem(e -> {
final String errorMessage = "There was an unexpected error while saving entities";
LOG.error(errorMessage, e);
return Result.KO;
})
.flatMap(result -> {
rm.setResult(result);
return Uni.createFrom()
.item(Message.of(rm), message::ack))
});
}
}
@ApplicationScoped
public class WorkerService {
private final EntityADao entityADao;
private final EntityBDao entityBDao;
private final EntityCDao entityCDao;
public WorkerService(final EntityADao entityADao,
final EntityBDao entityBDao,
final EntityCDao entityCDao) {
this.entityADao = entityADao;
this.entityBDao = entityBDao;
this.entityCDao = entityCDao;
}
@Transactional(TxType.REQUIRED)
public Uni<Result> saveEntities(final RequestMessage requestMessage) {
return Uni.createFrom().item(Result.OK)
// Save Entity A
.flatMap(result -> {
LOG.debug("(1) Saving EntityA ...");
return entityADao.save(requestMessage.getEntityAData());
})
// Save Entity B
.flatMap(result -> {
LOG.debug("(2) Saving EntityB ...");
return entityBDao.save(requestMessage.getEntityBData());
})
// Save Entity C
.flatMap(result -> {
LOG.debug("(3) Saving EntityC ...");
return entityCDao.dao(requestMessage.getEntityCData());
})
// Return OK
.flatMap(result -> Uni.createFrom().item(Result.OK));
}
}
@ApplicationScoped
public class EntityADao {
private final PgPool client;
public EntityADao(final PgPool client) {
this.client = client;
}
@Transactional(TxType.MANDATORY)
public Uni<Result> save(final EntityAData entityAData) {
return client
.preparedQuery(
"INSERT INTO A(col1, col2, col3) " +
"VALUES ($1, $2, $3)")
.execute(Tuple.of(entityAData.col1(), entityAData.col2(), entityAData.col3()))
.flatMap(pgRowSet -> {
LOG.debug("Inserted EntityA!");
return Result.OK;
});
}
}
EntityBDao
并且EntityCDao
喜欢EntityADao
。
我已经将以下依赖项添加到pom.xml
:
quarkus-smallrye-context-propagation
quarkus-narayana-jta
为什么在INSERT B
查询EntityBDao
失败时,之前执行的查询(INSERT A
)没有回滚?我错过了什么?为了让它工作,我必须改变什么?