1)问题描述:
- 我正在使用 Reactive Spring-Data + Reactive MongoDb + Spring WebFlux;
- 我有 02 个收藏:o 书 o 作者
2)我的目标是:
- 使用事务 当第二个对象抛出异常时,“事务 roolbak”被触发; 2 DB-操作回滚
3)代码:
3.1) 收藏任务
@Document(collection = "task")
public class Task {
@Id
private String _id;
@Field("pid")
private String projectId;
private String taskName;
private String ownerProject;
}
3.2) 收藏项目
@Document(collection = "project")
public class Project {
@Id
private String _id;
private String ownerProject;
private Task tasksProject;
private String code;
}
3.3) 配置类
// ========================== PropertySource + ConfigurationProperties =============================
// Check - PropertySource: https://www.baeldung.com/configuration-properties-in-spring-boot
// Setter are CRUCIAL for PropertySource + ConfigurationProperties works properly
@PropertySource(value = "classpath:application-dev.yml", ignoreResourceNotFound = true)
@ConfigurationProperties(prefix = "spring.data.mongodb")
@Setter
// =================================================================================================
@Profile("dev")
@Configuration
//@EnableTransactionManagement
@EnableReactiveMongoRepositories(
basePackages = {
"com.webflux.mongo2.project",
"com.webflux.mongo2.task.repo"})
public class ConfigDbDevProfile extends AbstractReactiveMongoConfiguration {
private String database;
private String host;
private String port;
private String username;
private String password;
private String authenticationDatabase;
// 01) REACTIVE-MONGO-TEMPLATE-BEANS:
@Override
public MongoClient reactiveMongoClient() {
String connectionURI = "mongodb://" +
username + ":" + password + "@" +
host + ":" + port + "/" + database +
"?authSource=" + authenticationDatabase +
"&authMechanism=SCRAM-SHA-1";
// TEMPLATE-URI: mongodb://test:pass@host1:port/?authSource=admin&authMechanism=SCRAM-SHA-1
System.out.println("Connection ---------------> URI ---------------> :" + connectionURI);
// Caused by: java.lang.IllegalArgumentException: The connection string contains an
// invalid host '${HOST}:${PORT}'. The port '${PORT}' is not a valid, it must be an
// integer between 0 and 65535
// at com.mongodb.ConnectionString.validatePort(ConnectionString.java:1069)
// at com.mongodb.ConnectionString.parseHosts(ConnectionString.java:1049)
// at com.mongodb.ConnectionString.<init>(ConnectionString.java:350)
// at com.mongodb.reactivestreams.client.MongoClients.create(MongoClients.java:62)
// at com.webflux.mongo2.core.database.DevConfigDb.reactiveMongoClient(DevConfigDb.java:52)
return MongoClients.create(connectionURI);
}
@Override
protected String getDatabaseName() {
return database;
}
@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
}
// 02) TRANSACTION-BEANS:
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {
return new ReactiveMongoTransactionManager(factory);
}
// 03) GRID-FS-BEANS:
// @Bean
// public ReactiveGridFsTemplate reactiveGridFsTemplate() throws Exception {
// return new ReactiveGridFsTemplate(reactiveMongoDbFactory(),mongoConverter);
// }
}
3.4) 服务等级
@Service("serviceTransaction")
@AllArgsConstructor
public class ServiceTransaction implements IServiceTransaction {
private final ProjectExceptionsThrower projectThrower;
private final TaskExceptionsThrower taskThrower;
IServiceRepo serviceRepo;
ITaskRepo taskRepo;
/*
EXCEPTIONS:
A) SERVICE: BLOW-UP EXCEPTIONS IN THE SERVICE
B) CONTROLLER: TREAT/HANDLE EXCEPTIONS IN THE CONTROLLER(ON-ERROR-RESUME)
*/
// @Transactional(transactionManager="transactionManager1")
@Transactional
@Override
public Mono<Task> createProjectTransaction(Project project, Task task) {
// @formatter:off
return
Mono.just(project)
.flatMap(proj1 -> {
if (proj1.getName().isEmpty()) return projectThrower.throwProjectNameIsEmptyException();
return Mono.just(proj1); })
.flatMap(proj2 -> serviceRepo.save(proj2))
.flatMap(proj3 -> {
task.setProjectId(proj3.get_id());
return Mono.just(task); })
.flatMap(task1 -> {
if (task1.getName().isEmpty()) return taskThrower.throwTaskNameIsEmptyException();
if (task1.getName().length() < 3) return taskThrower.throwTaskNameLessThanThreeException();
return Mono.just(task1);
})
.flatMap(task1 -> taskRepo.save(task1))
;
// @formatter:on
}
}
3.5) 控制器/资源类
package com.webflux.api.modules.project.resource;
// ==> EXCEPTIONS IN CONTROLLER:
// *** REASON: IN WEBFLUX, EXCEPTIONS MUST BE IN CONTROLLER - WHY?
// - "Como stream pode ser manipulado por diferentes grupos de thread,
// - caso um erro aconteça em uma thread que não é a que operou a controller,
// - o ControllerAdvice não vai ser notificado "
// - https://medium.com/nstech/programa%C3%A7%C3%A3o-reativa-com-spring-boot-webflux-e-mongodb-chega-de-sofrer-f92fb64517c3
@RestController
@RequestMapping(REPO_ROOT_TRANSACT)
@AllArgsConstructor
public class ResourceTransaction {
private final ProjectExceptionsThrower projectThrower;
private final TaskExceptionsThrower taskThrower;
private IServiceTransaction serviceTransaction;
@PostMapping(REPO_TRANSACT)
@ResponseStatus(CREATED)
public Mono<Task> createProjectTransaction(
@RequestParam
// @NotEmpty
// @NotNull
String taskNameInitial,
// @Valid
@RequestBody
Project project
) {
Task initialTask = taskNoID("3",
"Mark",
1000L
).create();
initialTask.setName(taskNameInitial);
// @formatter:off
return
serviceTransaction
.createProjectTransaction(project, initialTask)
.onErrorResume(error ->
switch (error) {
case ProjectNameIsEmptyException ignored -> projectThrower.throwProjectNameIsEmptyException();
case ProjectNotFoundException ignored -> projectThrower.throwProjectNotFoundException();
case TaskNameIsEmptyException ignored -> taskThrower.throwTaskNameIsEmptyException();
case TaskNameLessThanThreeException ignored -> taskThrower.throwTaskNameLessThanThreeException();
default -> Mono.error(new ResponseStatusException(NOT_FOUND));
})
;
// @formatter:on
}
}
3.6) 我的测试课
package com.webflux.api.modules.project.core.exceptions;
@Import({TestDbUtilsConfig.class})
@DisplayName("ResourceTransactionExcTest")
@MergedResource
class ResourceTransactionExcTest {
// STATIC-@Container: one service for ALL tests -> SUPER FASTER
// NON-STATIC-@Container: one service for EACH test
@Container
private static final DockerComposeContainer<?> compose = new TcComposeConfig().getTcCompose();
final String enabledTest = "true";
// MOCKED-SERVER: WEB-TEST-CLIENT(non-blocking client)'
// SHOULD BE USED WITH 'TEST-CONTAINERS'
// BECAUSE THERE IS NO 'REAL-SERVER' CREATED VIA DOCKER-COMPOSE
@Autowired
WebTestClient mockedWebClient;
@Autowired
TestDbUtils dbUtils;
@Autowired
IServiceCrud serviceCrud;
@Autowired
IServiceTask taskService;
private Project projetoWithId;
@BeforeAll
static void beforeAll(TestInfo testInfo) {
globalBeforeAll();
globalTestMessage(testInfo.getDisplayName(), "class-start");
globalComposeServiceContainerMessage(compose,
TC_COMPOSE_SERVICE,
TC_COMPOSE_SERVICE_PORT
);
RestAssuredWebTestClient.reset();
RestAssuredWebTestClient.requestSpecification =
requestSpecsSetPath("http://localhost:8080" + PROJ_ROOT_CRUD);
RestAssuredWebTestClient.responseSpecification = responseSpecs();
}
@AfterAll
static void afterAll(TestInfo testInfo) {
globalAfterAll();
globalTestMessage(testInfo.getDisplayName(), "class-end");
}
@BeforeEach
void beforeEach(TestInfo testInfo) {
globalTestMessage(testInfo.getTestMethod()
.toString(), "method-start");
Project project1 = projecNoID("C",
"2020-05-05",
"2021-05-05",
1000L,
of("UK", "USA")
).create();
projetoWithId = projectWithID("C",
"2020-05-05",
"2021-05-05",
1000L,
of("HOL", "CAN")
).create();
List<Project> projectList = asList(project1, projetoWithId);
Flux<Project> projectFlux = dbUtils.saveProjectList(projectList);
dbUtils.countAndExecuteFlux(projectFlux, 2);
Task task1 = taskWithID("3",
"Mark",
1000L
).create();
Flux<Task> taskFlux = dbUtils.saveTaskList(singletonList(task1));
dbUtils.countAndExecuteFlux(taskFlux, 1);
}
@AfterEach
void tearDown(TestInfo testInfo) {
globalTestMessage(testInfo.getTestMethod()
.toString(), "method-end");
}
@Test
@EnabledIf(expression = enabledTest, loadContext = true)
@DisplayName("createProjectTransacTaskNameEmptyExc")
public void createProjectTransacTaskNameEmptyExc() {
dbUtils.countAndExecuteFlux(serviceCrud.findAll(), 2);
dbUtils.countAndExecuteFlux(taskService.findAll(), 1);
var newTaskName = Faker.instance()
.name()
.firstName();
Project project = projectWithID("C",
"2020-05-05",
"2021-05-05",
1000L,
of("UK", "USA")
).create();
project.setName("xx");
newTaskName = "XX";
RestAssuredWebTestClient
.given()
.webTestClient(mockedWebClient)
.body(project)
.queryParam("taskNameInitial", newTaskName)
.when()
.post(REPO_TRANSACT)
.then()
.log()
.everything()
.statusCode(NOT_ACCEPTABLE.value())
// .body(matchesJsonSchemaInClasspath("contracts/project/createProjectTransaction"))
;
dbUtils.countAndExecuteFlux(serviceCrud.findAll(), 2);
dbUtils.countAndExecuteFlux(taskService.findAll(), 1);
}
@Test
@EnabledIf(expression = enabledTest, loadContext = true)
@DisplayName("BHWorks")
public void bHWorks() {
bhWorks();
}
}
2)问题:
- 如何使用@Transaction Annotation 来回滚已完成的数据库操作?
- 我的测试是检测到 02 对象被持久化在数据库中
非常感谢您的帮助