0

我正在尝试构建一个示例应用程序,其中消息可以保存到数据库,然后当 websocket 与 sessionId 连接时,这些消息保存在下面,它们都被发送出去。

这是我的代码:

Message.java
---
package org.acme.hibernate.orm.panache;

import java.util.Date;

import javax.persistence.Cacheable;
import javax.persistence.Entity;
import io.quarkus.panache.common.Sort;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;
import io.smallrye.mutiny.Multi;

@Entity
@Cacheable
public class Message extends PanacheEntity {

  public String sessionId;
  public String content;
  public Date timestamp;

  public Message() {
    this.timestamp = new Date();
  }

  public Message(String content, String sessionId) {
    this.content = content;
    this.timestamp = new Date();
    this.sessionId = sessionId;
  }  

  public static Multi<Message> getBySessionId(String sessionId) {
    return stream("sessionId", Sort.by("timestamp"), sessionId);
  }

}

MessageResource.java
---
package org.acme.hibernate.orm.panache;

import java.util.Date;
import java.util.List;

import static javax.ws.rs.core.Response.Status.CREATED;

import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;


@Path("/message")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class MessageResource {
  
  @GET
  public Uni<List<Message>> list() {
    return Message.listAll();
  }

  @GET
  @Path("/{sessionId}")
  public Multi<Message> listBySessionId(@PathParam("sessionId") String sessionId) {
    return Message.getBySessionId(sessionId);
  }

  @POST
  @Transactional
  public Uni<Response> create(Message message) {
    message.timestamp = new Date();
    message.persist();
    return Panache.withTransaction(message::persist)
      .replaceWith(Response.ok(message).status(CREATED)::build);
  }


  /**
     * Create a HTTP response from an exception.
     *
     * Response Example:
     *
     * <pre>
     * HTTP/1.1 422 Unprocessable Entity
     * Content-Length: 111
     * Content-Type: application/json
     *
     * {
     *     "code": 422,
     *     "error": "Fruit name was not set on request.",
     *     "exceptionType": "javax.ws.rs.WebApplicationException"
     * }
     * </pre>
     */
    @Provider
    public static class ErrorMapper implements ExceptionMapper<Exception> {

        @Inject
        ObjectMapper objectMapper;

        @Override
        public Response toResponse(Exception exception) {

            Throwable throwable = exception;

            int code = 500;
            if (throwable instanceof WebApplicationException) {
                code = ((WebApplicationException) exception).getResponse().getStatus();
            }

            // This is a Mutiny exception and it happens, for example, when we try to insert a new
            // fruit but the name is already in the database
            if (throwable instanceof CompositeException) {
                throwable = ((CompositeException) throwable).getCause();
            }

            ObjectNode exceptionJson = objectMapper.createObjectNode();
            exceptionJson.put("exceptionType", throwable.getClass().getName());
            exceptionJson.put("code", code);

            if (exception.getMessage() != null) {
                exceptionJson.put("error", throwable.getMessage());
            }

            return Response.status(code)
                    .entity(exceptionJson)
                    .build();
        }
      }
}
ReplaySocket.java
---
package org.acme.websockets;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.quarkus.hibernate.reactive.panache.PanacheEntityBase;
import io.quarkus.panache.common.Sort;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.acme.hibernate.orm.panache.Message;
import org.acme.hibernate.orm.panache.MessageResource;
import org.jboss.logging.Logger;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

@ServerEndpoint("/replay/{sessionId}")         
@ApplicationScoped
public class ReplaySocket {

  @Inject
  MessageResource messageResource;

  private static final Logger LOG = Logger.getLogger(ReplaySocket.class.getName());
  
  Map<String, Session> sessions = new ConcurrentHashMap<>(); 

  @OnOpen
    public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
        sessions.put(sessionId, session);
        broadcast("Starting");
        replayMessages(sessionId);
    }

    private void replayMessages(String sessionId) {
      Multi<Message> messages = Message.stream("sessionId", Sort.by("timestamp"), sessionId);

      
      messages.subscribe().with(
        message -> broadcast(message.content),
        failure -> System.out.println(failure)
      );
    }

    private void broadcast(String message) {
      sessions.values().forEach(s -> {
          s.getAsyncRemote().sendObject(message, result ->  {
              if (result.getException() != null) {
                  System.out.println("Unable to send message: " + result.getException());
              }
          });
      });
  }

}

当我运行它时,我可以保存并从 MessageResource 端点获取。但是,当我尝试在 websocket 中获取消息时,出现此错误:

2021-07-13 10:23:57,016 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-8) failed to execute statement [select message0_.id as id1_0_, message0_.content as content2_0_, message0_.sessionId as sessioni3_0_, message0_.timestamp as timestam4_0_ from Message message0_ where message0_.sessionId=$1 order by message0_.timestamp]
2021-07-13 10:23:57,017 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-8) could not execute query: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Session/EntityManager is closed
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1081)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:360)
    at io.vertx.core.Future$$Lambda$889/0x0000000000000000.handle(Unknown Source)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
    at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
    at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:102)
    at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:35)
    at io.vertx.core.Promise.complete(Promise.java:66)
    at io.vertx.core.Promise.handle(Promise.java:51)
    at io.vertx.core.Promise.handle(Promise.java:29)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at io.vertx.core.impl.future.FutureBase$$Lambda$625/0x0000000000000000.run(Unknown Source)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:836)
Caused by: java.lang.IllegalStateException: Session/EntityManager is closed
    at org.hibernate.internal.AbstractSharedSessionContract.checkOpen(AbstractSharedSessionContract.java:393)
    at org.hibernate.engine.spi.SharedSessionContractImplementor.checkOpen(SharedSessionContractImplementor.java:148)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1561)
    at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:399)
    at org.hibernate.internal.SessionImpl.getEntityUsingInterceptor(SessionImpl.java:592)
    at org.hibernate.loader.Loader.getRow(Loader.java:1609)
    at org.hibernate.loader.Loader.getRowFromResultSet(Loader.java:747)
    at org.hibernate.loader.Loader.getRowsFromResultSet(Loader.java:1046)
    at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader.getRowsFromResultSet(ReactiveQueryLoader.java:223)
    at org.hibernate.reactive.loader.ReactiveLoaderBasedResultSetProcessor.reactiveExtractResults(ReactiveLoaderBasedResultSetProcessor.java:72)
    at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader$1.reactiveExtractResults(ReactiveQueryLoader.java:72)
    at org.hibernate.reactive.loader.ReactiveLoader.reactiveProcessResultSet(ReactiveLoader.java:146)
    at org.hibernate.reactive.loader.ReactiveLoader.lambda$doReactiveQueryAndInitializeNonLazyCollections$0(ReactiveLoader.java:77)
    at org.hibernate.reactive.loader.ReactiveLoader$$Lambda$1220/0x0000000000000000.apply(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
    ... 23 more

java.lang.IllegalStateException: Session/EntityManager is closed

我哪里错了?我试图做的事情根本无效吗?如果是这样,你能推荐一些不同的东西吗?

谢谢!

4

1 回答 1

1

如您所见,来自数据库的响应由名为vert.x-eventloop-thread-8. 这是意料之中的:来自反应式 sql 驱动程序的所有响应都在 eventloop 线程上传递。

您看到的IllegalStateException可能是由于 Session 尚未在 eventloop 线程上打开这一事实引起的 - 这会导致诸如当所有这些代码都设计为在单个运行时运行多线程的问题eventloop(并且不需要多个线程,更不用说使其成为多线程会降低性能)。您应该能够通过在调用 Panache Reactive 之前从代码中记录线程名称来验证这一点:我敢打赌它不是vert.x-eventloop-thread-X.

换句话说,Panache Reactive 旨在用于完全反应式的上下文。

从 Quarkus 2.1 开始,Panache Reactive 应该能够检测到这种模式并将操作卸载到正确的线程上。

于 2021-07-20T12:58:58.597 回答