5

当使用 Spring Data MongoDB 的反应式编程模型时,可以执行如下事务:

Mono<DeleteResult> result = template.inTransaction()                                      
    .execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); 

但是 Spring Data MongoDB 也支持“反应式存储库”,例如:

public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  Flux<Person> findByLocationNear(Point location, Distance distance);
}

public interface CarRepository extends ReactiveMongoRepository<Car, String>

  Flux<Car> findByYear(int year);
}

我的问题是,鉴于您有ReactiveMongoRepository's,您能否以某种方式利用 MongoDB 事务,例如在同一个事务中插入 aPersonCar(在这种情况下使用PersonRepositoryCarRepository)?如果是这样,你如何做到这一点?

4

2 回答 2

11

我也一直在努力寻找响应式 Mongo DB 和 Spring Boot 风格的事务支持的解决方案

但幸运的是我自己想通了。虽然谷歌的一些东西也有帮助,但那些都是非反应性的。

重要说明- 对于 Spring boot 2.2.x,它运行良好,但对于 spring boot 2.3.x,它还有一些其他问题,它有内部重写和更改一起

  • 您需要使用ReactiveMongoTransactionManagerReactiveMongoDatabaseFactory,最后的大部分细节,也共享相同的代码仓库

  • 为了让 mongo db 支持事务,我们需要确保数据库应该在副本模式下运行

    为什么我们需要那个?因为否则你会得到一些这样的错误: -

    此客户端连接到的 MongoDB 集群不支持会话

相同的说明如下: -

  1. 使用 docker-compose.yml 运行基于 docker-compose 的 mongo db 服务器,如下共享:-
version: "3"
services:
    mongo:
        hostname: mongo
        container_name: localmongo_docker
        image: mongo
        expose:
          - 27017
        ports:
          - 27017:27017
        restart: always
        entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
        volumes:
          - ./mongodata:/data/db # need to create a docker volume named as mongodata first

  1. 镜像出来后,执行命令(这里localmongo_docker是容器的名字):-
docker exec -it localmongo_docker mongo
  1. 复制并粘贴下面的命令并执行
rs.initiate(
   {
     _id : 'rs0',
     members: [
       { _id : 0, host : "mongo:27017" }
     ]
   }
 )
  1. 然后输入exit退出执行

重要- 可以在我的 github 上找到代码仓库 - https://github.com/krnbr/mongo-spring-boot-template

代码的重要说明如下:-

  • config包中的MongoConfiguration类是使事务正常工作的重要部分,配置类的链接在这里

  • 主要部分是Bean

     @Bean
     ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
         return new ReactiveMongoTransactionManager(dbFactory);
     }
    
  • 要检查代码的事务要求的工作,您可以在此处查看服务包中的 UserService 类

如果链接对某人不起作用,则共享代码:-

配置和 Beans 内部

@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {

    @Autowired
    private MongoProperties mongoProperties;

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Override
    protected String getDatabaseName() {
        return mongoProperties.getDatabase();
    }

    @Override
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getUri());
    }
}

application.properties(与 mongo db 相关)

spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0

文档类

角色类

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {

    @Id
    private String id;

    @Field("role_name")
    @Indexed(unique = true)
    private String role;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

用户类

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {

    @Id()
    private String id;

    @Field("username")
    @Indexed(unique = true)
    @JsonProperty("username")
    private String userName;

    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private String password;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @DBRef(lazy = true)
    @JsonProperty("roles")
    private List<Role> roles = new ArrayList();

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

用户配置文件类

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {

    @Id
    private String id;

    @Indexed(unique = true)
    private String mobile;

    @Indexed(unique = true)
    private String email;

    private String address;

    private String firstName;

    private String lastName;

    @DBRef
    private User user;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }

}

ReactiveMongoRepository 接口

角色存储库

public interface RoleRepository extends ReactiveMongoRepository<Role, String> {

    Mono<Role> findByRole(String role);

    Flux<Role> findAllByRoleIn(List<String> roles);

}

用户存储库

public interface UserRepository extends ReactiveMongoRepository<User, String> {

    Mono<User> findByUserName(String userName);

}

用户配置文件存储库

public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}

用户服务类这里需要自己创建一个RuntimeException类,这里是AppRuntimeException类,我一直在用

@Slf4j
@Service
public class UserService {

    @Autowired
    private RoleRepository roleRepository;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private UserProfileRepository userProfileRepository;

    @Transactional
    public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {

        Mono<Role> roleMono = roleRepository.findByRole("USER");

        Mono<User> userMono = roleMono.flatMap(r -> {
            User user = new User()
                    .setUserName(userRequest.getUsername())
                    .setPassword(userRequest.getPassword());
            user.setRoles(Arrays.asList(r));
            return userRepository.save(user);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
            UserProfile userProfile = new UserProfile()
                    .setAddress(userRequest.getAddress())
                    .setEmail(userRequest.getEmail())
                    .setMobile(userRequest.getMobile())
                    .setUser(u);
            return userProfileRepository.save(userProfile);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        return userProfileMono;

    }

}

控制器和模型类

UserRequest模型类

@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {

    private String username;
    private String password;
    private String mobile;
    private String email;
    private String address;
    private String firstName;
    private String lastName;

}

UserProfileApisController

@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {

    @Autowired
    private UserService userService;

    @PostMapping
    public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
        return userService.saveUserAndItsProfile(userRequest);
    }

}
于 2020-05-08T09:39:11.917 回答
1

只是对有关 MongoDB 副本集初始化的公认答案的补充。

  1. 如果需要一个非固定端口的单副本集进行测试,他们可能会使用封装了此类初始化的 Testcontainers 的 MongoDB 模块
final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.2.8");

我们可以通过 'mongoDBContainer.start()' 启动它并通过 try-with-resources 或 'mongoDBContainer.stop()' 停止它。在此处查看有关此模块和 Spring Data MongoDB 的更多详细信息

  1. 如果需要一个非固定端口多节点副本集来测试复杂的生产问题,他们可能会使用这个项目,例如:
try (
  //create a PSA mongoDbReplicaSet and auto-close it afterwards
  final MongoDbReplicaSet mongoDbReplicaSet = MongoDbReplicaSet.builder()
    //with 2 working nodes
    .replicaSetNumber(2)
    //with an arbiter node
    .addArbiter(true)
    //create a proxy for each node to simulate network partitioning
    .addToxiproxy(true)
    .build()
) {
  //start it
  mongoDbReplicaSet.start();
  assertNotNull(mongoDbReplicaSet.getReplicaSetUrl());
  //do some testing
}
于 2020-07-30T10:22:50.743 回答