我正在尝试构建一个示例应用程序,其中消息可以保存到数据库,然后当 websocket 与 sessionId 连接时,这些消息保存在下面,它们都被发送出去。
这是我的代码:
Message.java
---
package org.acme.hibernate.orm.panache;
import java.util.Date;
import javax.persistence.Cacheable;
import javax.persistence.Entity;
import io.quarkus.panache.common.Sort;
import io.quarkus.hibernate.reactive.panache.PanacheEntity;
import io.smallrye.mutiny.Multi;
@Entity
@Cacheable
public class Message extends PanacheEntity {
public String sessionId;
public String content;
public Date timestamp;
public Message() {
this.timestamp = new Date();
}
public Message(String content, String sessionId) {
this.content = content;
this.timestamp = new Date();
this.sessionId = sessionId;
}
public static Multi<Message> getBySessionId(String sessionId) {
return stream("sessionId", Sort.by("timestamp"), sessionId);
}
}
MessageResource.java
---
package org.acme.hibernate.orm.panache;
import java.util.Date;
import java.util.List;
import static javax.ws.rs.core.Response.Status.CREATED;
import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@Path("/message")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class MessageResource {
@GET
public Uni<List<Message>> list() {
return Message.listAll();
}
@GET
@Path("/{sessionId}")
public Multi<Message> listBySessionId(@PathParam("sessionId") String sessionId) {
return Message.getBySessionId(sessionId);
}
@POST
@Transactional
public Uni<Response> create(Message message) {
message.timestamp = new Date();
message.persist();
return Panache.withTransaction(message::persist)
.replaceWith(Response.ok(message).status(CREATED)::build);
}
/**
* Create a HTTP response from an exception.
*
* Response Example:
*
* <pre>
* HTTP/1.1 422 Unprocessable Entity
* Content-Length: 111
* Content-Type: application/json
*
* {
* "code": 422,
* "error": "Fruit name was not set on request.",
* "exceptionType": "javax.ws.rs.WebApplicationException"
* }
* </pre>
*/
@Provider
public static class ErrorMapper implements ExceptionMapper<Exception> {
@Inject
ObjectMapper objectMapper;
@Override
public Response toResponse(Exception exception) {
Throwable throwable = exception;
int code = 500;
if (throwable instanceof WebApplicationException) {
code = ((WebApplicationException) exception).getResponse().getStatus();
}
// This is a Mutiny exception and it happens, for example, when we try to insert a new
// fruit but the name is already in the database
if (throwable instanceof CompositeException) {
throwable = ((CompositeException) throwable).getCause();
}
ObjectNode exceptionJson = objectMapper.createObjectNode();
exceptionJson.put("exceptionType", throwable.getClass().getName());
exceptionJson.put("code", code);
if (exception.getMessage() != null) {
exceptionJson.put("error", throwable.getMessage());
}
return Response.status(code)
.entity(exceptionJson)
.build();
}
}
}
ReplaySocket.java
---
package org.acme.websockets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.quarkus.hibernate.reactive.panache.PanacheEntityBase;
import io.quarkus.panache.common.Sort;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.acme.hibernate.orm.panache.Message;
import org.acme.hibernate.orm.panache.MessageResource;
import org.jboss.logging.Logger;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@ServerEndpoint("/replay/{sessionId}")
@ApplicationScoped
public class ReplaySocket {
@Inject
MessageResource messageResource;
private static final Logger LOG = Logger.getLogger(ReplaySocket.class.getName());
Map<String, Session> sessions = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
sessions.put(sessionId, session);
broadcast("Starting");
replayMessages(sessionId);
}
private void replayMessages(String sessionId) {
Multi<Message> messages = Message.stream("sessionId", Sort.by("timestamp"), sessionId);
messages.subscribe().with(
message -> broadcast(message.content),
failure -> System.out.println(failure)
);
}
private void broadcast(String message) {
sessions.values().forEach(s -> {
s.getAsyncRemote().sendObject(message, result -> {
if (result.getException() != null) {
System.out.println("Unable to send message: " + result.getException());
}
});
});
}
}
当我运行它时,我可以保存并从 MessageResource 端点获取。但是,当我尝试在 websocket 中获取消息时,出现此错误:
2021-07-13 10:23:57,016 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-8) failed to execute statement [select message0_.id as id1_0_, message0_.content as content2_0_, message0_.sessionId as sessioni3_0_, message0_.timestamp as timestam4_0_ from Message message0_ where message0_.sessionId=$1 order by message0_.timestamp]
2021-07-13 10:23:57,017 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-8) could not execute query: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Session/EntityManager is closed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1081)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:360)
at io.vertx.core.Future$$Lambda$889/0x0000000000000000.handle(Unknown Source)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:102)
at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:35)
at io.vertx.core.Promise.complete(Promise.java:66)
at io.vertx.core.Promise.handle(Promise.java:51)
at io.vertx.core.Promise.handle(Promise.java:29)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
at io.vertx.core.impl.future.FutureBase$$Lambda$625/0x0000000000000000.run(Unknown Source)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:836)
Caused by: java.lang.IllegalStateException: Session/EntityManager is closed
at org.hibernate.internal.AbstractSharedSessionContract.checkOpen(AbstractSharedSessionContract.java:393)
at org.hibernate.engine.spi.SharedSessionContractImplementor.checkOpen(SharedSessionContractImplementor.java:148)
at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1561)
at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:399)
at org.hibernate.internal.SessionImpl.getEntityUsingInterceptor(SessionImpl.java:592)
at org.hibernate.loader.Loader.getRow(Loader.java:1609)
at org.hibernate.loader.Loader.getRowFromResultSet(Loader.java:747)
at org.hibernate.loader.Loader.getRowsFromResultSet(Loader.java:1046)
at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader.getRowsFromResultSet(ReactiveQueryLoader.java:223)
at org.hibernate.reactive.loader.ReactiveLoaderBasedResultSetProcessor.reactiveExtractResults(ReactiveLoaderBasedResultSetProcessor.java:72)
at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader$1.reactiveExtractResults(ReactiveQueryLoader.java:72)
at org.hibernate.reactive.loader.ReactiveLoader.reactiveProcessResultSet(ReactiveLoader.java:146)
at org.hibernate.reactive.loader.ReactiveLoader.lambda$doReactiveQueryAndInitializeNonLazyCollections$0(ReactiveLoader.java:77)
at org.hibernate.reactive.loader.ReactiveLoader$$Lambda$1220/0x0000000000000000.apply(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
... 23 more
java.lang.IllegalStateException: Session/EntityManager is closed
我哪里错了?我试图做的事情根本无效吗?如果是这样,你能推荐一些不同的东西吗?
谢谢!