0

我的应用程序使用 Hibernate reactive 1.0.0.CR9 with vertx mysql client 4.1.2,我得到一个随机的不可重现的错误

java.util.concurrent.CompletionException: java.lang.IllegalStateException: HR000065: No Vert.x context active
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:991)
    at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
    at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:110)
    at org.hibernate.reactive.stage.impl.StageSessionFactoryImpl.withSession(StageSessionFactoryImpl.java:202)
    at org.hibernate.reactive.stage.impl.StageSessionFactoryImpl.withStatelessSession(StageSessionFactoryImpl.java:182)

另外,我正在使用下面的基本数据库存储库,它将hibernate响应的完成阶段转换为spring webflux Flux和Mono响应

public abstract class HReactiveCrudRepository<T extends BaseEntity,ID> implements ReactiveCrudRepository<T ,ID> {

    private Stage.SessionFactory sessionFactory;

    private final Class<T> type;

    public HReactiveCrudRepository(Class<T> type,Stage.SessionFactory sessionFactory){
        this.type = type;
        this.sessionFactory = sessionFactory;
    }

    @Override
    public <S extends T> Mono<S> save(S entity) {
        //Stage.StatelessSession session = sessionFactory.openStatelessSession();
        return Mono.fromCompletionStage(sessionFactory.withStatelessSession(session ->
                session.withTransaction(transaction ->
                                entity.isNew()?session.insert(entity):session.update(entity))
                        .thenApply(unused -> entity)));
    }

    @Override
    public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
        return null;
    }

    @Override
    public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
        return null;
    }

    @Override
    public Mono<T> findById(ID id) {
        return Mono.fromCompletionStage(sessionFactory.withStatelessSession(session ->
                        session.get(type,id)));
    }

    @Override
    public Mono<T> findById(Publisher<ID> id) {
        return null;
    }

    @Override
    public Mono<Boolean> existsById(ID id) {
        return Mono.fromCompletionStage(sessionFactory.withStatelessSession(session ->
                                session.get(type,id)))
                .map(object -> true)
                .defaultIfEmpty(false);
    }

    @Override
    public Mono<Boolean> existsById(Publisher<ID> id) {
        return null;
    }

    @Override
    public Flux<T> findAll() {
        return null;
    }

    @Override
    public Flux<T> findAllById(Iterable<ID> ids) {
        return null;
    }

    @Override
    public Flux<T> findAllById(Publisher<ID> idStream) {
        return null;
    }

    @Override
    public Mono<Long> count() {
        return null;
    }

    @Override
    public Mono<Void> deleteById(ID id) {
        return findById(id).flatMap(object -> Mono.fromCompletionStage(sessionFactory.withStatelessSession(session ->
                session.withTransaction(transaction ->
                        session.delete(object)))));
    }

    @Override
    public Mono<Void> deleteById(Publisher<ID> id) {
        return null;
    }

    @Override
    public Mono<Void> delete(T entity) {
        return Mono.fromCompletionStage(sessionFactory.withStatelessSession(session ->
                session.withTransaction(transaction ->
                        session.delete(entity))));
    }

    @Override
    public Mono<Void> deleteAllById(Iterable<? extends ID> ids) {
        return null;
    }

    @Override
    public Mono<Void> deleteAll(Iterable<? extends T> entities) {
        return null;
    }

    @Override
    public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
        return null;
    }

    @Override
    public Mono<Void> deleteAll() {
        return null;
    }

    @Override
    public Flux<T> findByQuery(String query, List<String> params, List<String> paramsValue) {
        return Mono.fromCompletionStage(sessionFactory.withStatelessSession( session -> {
            Stage.Query<T> namedQuery = session.createNativeQuery(
                    query
                    , type);
            IntStream.range(0,params.size()).forEach(i -> namedQuery.setParameter(params.get(i),paramsValue.get(i)));
            return namedQuery.getResultList();
        })).flatMapIterable(list -> list);
    }

    @Override
    public Flux<T> findByCriteria(CriteriaQuery<T> criteriaQuery, Predicate predicate) {

        return Mono.fromCompletionStage(sessionFactory.withStatelessSession( session -> {
            Stage.Query<T> namedQuery = session.createQuery(criteriaQuery);
            return namedQuery.getResultList();
        })).flatMapMany(Flux::fromIterable);
    }

    @Override
    public CriteriaBuilder getCriteriaBuilder() {
        return sessionFactory.getCriteriaBuilder();
    }
}

只有在上述存储库的用户数量似乎很大的情况下,才会随机出现错误。

4

0 回答 0