7

播放 2.5 亮点状态

更好地控制 WebSocket 帧 Play 2.5 WebSocket API 让您可以直接控制 WebSocket 帧。您现在可以发送和接收二进制、文本、ping、pong 和关闭帧。如果您不想担心这种详细程度,Play 仍会自动将您的 JSON 或 XML 数据转换为正确的帧类型。

但是 https://www.playframework.com/documentation/2.5.x/JavaWebSockets有关于 LegacyWebSocket 的示例,已弃用

  1. Java WebSockets 的推荐 API/模式是什么?使用 LegacyWebSocket 是 java websockets 的唯一选择吗?
  2. 是否有使用新消息类型 ping/pong 来实现心跳的示例?
4

1 回答 1

6

令人失望的是,关于这方面的官方文档非常稀少。也许在 Play 2.6 中我们会看到对此的更新。但是,我将在下面提供一个示例,说明如何在 Play 2.5 中配置聊天 websocket,以帮助有需要的人。

设置

控制器.java

@Inject
private Materializer materializer;
private ActorRef chatSocketRouter;

@Inject
public AController(@Named("chatSocketRouter") ActorRef chatInjectedActor) {
    this.chatSocketRouter = chatInjectedActor;
}


// Make a chat websocket for a user
public WebSocket chatSocket() {

    return WebSocket.Json.acceptOrResult(request -> {
        String authToken = getAuthToken();

        // Checking of token
        if (authToken == null) {
            return forbiddenResult("No [authToken] supplied.");
        }

        // Could we find the token in the database?
        final AuthToken token = AuthToken.findByToken(authToken);
        if (token == null) {
            return forbiddenResult("Could not find [authToken] in DB. Login again.");
        }

        User user = token.getUser();
        if (user == null) {
            return forbiddenResult("You are not logged in to view this stream.");
        }

        Long userId = user.getId();

        // Create a function to be run when we initialise a flow.
        // A flow basically links actors together.
        AbstractFunction1<ActorRef, Props> getWebSocketActor = new AbstractFunction1<ActorRef, Props>() {
            @Override
            public Props apply(ActorRef connectionProperties) {

                // We use the ActorRef provided in the param above to make some properties.
                // An ActorRef is a fancy word for thread reference.
                // The WebSocketActor manages the web socket connection for one user.
                // WebSocketActor.props() means "make one thread (from the WebSocketActor) and return the properties on how to reference it".
                // The resulting Props basically state how to construct that thread.
                Props properties = ChatSocketActor.props(connectionProperties, chatSocketRouter, userId);

                // We can have many connections per user. So we need many ActorRefs (threads) per user. As you can see from the code below, we do exactly that. We have an object called
                // chatSocketRouter which holds a Map of userIds -> connectionsThreads and we "tell"
                // it a lightweight object (UserMessage) that is made up of this connecting user's ID and the connection.
                // As stated above, Props are basically a way of describing an Actor, or dumbed-down, a thread.

                // In this line, we are using the Props above to
                // reference the ActorRef we've just created above
                ActorRef anotherUserDevice = actorSystem.actorOf(properties);
                // Create a lightweight object...
                UserMessage routeThisUser = new UserMessage(userId, anotherUserDevice);
                // ... to tell the thread that has our Map that we have a new connection
                // from a user.
                chatSocketRouter.tell(routeThisUser, ActorRef.noSender());

                // We return the properties to the thread that will be managing this user's connection
                return properties;
            }
        };

        final Flow<JsonNode, JsonNode, ?> jsonNodeFlow =
                ActorFlow.<JsonNode, JsonNode>actorRef(getWebSocketActor,
                        100,
                        OverflowStrategy.dropTail(),
                        actorSystem,
                        materializer).asJava();

        final F.Either<Result, Flow<JsonNode, JsonNode, ?>> right = F.Either.Right(jsonNodeFlow);
        return CompletableFuture.completedFuture(right);
    });
}

// Return this whenever we want to reject a 
// user from connecting to a websocket
private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> forbiddenResult(String msg) {
    final Result forbidden = Results.forbidden(msg);
    final F.Either<Result, Flow<JsonNode, JsonNode, ?>> left = F.Either.Left(forbidden);
    return CompletableFuture.completedFuture(left);
}

ChatSocketActor.java

public class ChatSocketActor extends UntypedActor {

    private final ActorRef out;
    private final Long userId;
    private ActorRef chatSocketRouter;


    public ChatSocketActor(ActorRef out, ActorRef chatSocketRouter, Long userId) {
        this.out = out;
        this.userId = userId;
        this.chatSocketRouter = chatSocketRouter;
    }

    public static Props props(ActorRef out, ActorRef chatSocketRouter, Long userId) {
        return Props.create(ChatSocketActor.class, out, chatSocketRouter, userId);
    }

    // Add methods here handling each chat connection...

}

ChatSocketRouter.java

public class ChatSocketRouter extends UntypedActor {

    public ChatSocketRouter() {}


    // Stores userIds to websockets
    private final HashMap<Long, List<ActorRef>> senders = new HashMap<>();

    private void addSender(Long userId, ActorRef actorRef){
        if (senders.containsKey(userId)) {
            final List<ActorRef> actors = senders.get(userId);
            actors.add(actorRef);
            senders.replace(userId, actors);
        } else {
            List<ActorRef> l = new ArrayList<>();
            l.add(actorRef);
            senders.put(userId, l);
        }
     }


     private void removeSender(ActorRef actorRef){
         for (List<ActorRef> refs : senders.values()) {
             refs.remove(actorRef);
         }
     }

    @Override
    public void onReceive(Object message) throws Exception {
        ActorRef sender = getSender();

        // Handle messages sent to this 'router' here

        if (message instanceof UserMessage) {
            UserMessage userMessage = (UserMessage) message;
            addSender(userMessage.userId, userMessage.actorRef);
            // Watch sender so we can detect when they die.
            getContext().watch(sender);
        } else if (message instanceof Terminated) {
            // One of our watched senders has died.
            removeSender(sender);

        } else {
            unhandled(message);
        }
    }
}

例子

现在,每当您想通过 websocket 连接向客户端发送消息时,您都可以执行以下操作:

ChatSenderController.java

private ActorRef chatSocketRouter;

@Inject
public ChatSenderController(@Named("chatSocketRouter") ActorRef chatInjectedActor) {
    this.chatSocketRouter = chatInjectedActor;
}

public static void sendMessage(Long sendToId) {
    // E.g. send the chat router a message that says hi
    chatSocketRouter.tell(new Message(sendToId, "Hi"));
}

ChatSocketRouter.java

@Override
public void onReceive(Object message) throws Exception {
    // ...

    if (message instanceof Message) {
         Message messageToSend = (Message) message;
         // Loop through the list above and send the message to
         // each connection. For example...
         for (ActorRef wsConnection : senders.get(messageToSend.getSendToId())) {
              // Send "Hi" to each of the other client's
              // connected sessions
              wsConnection.tell(messageToSend.getMessage());
         }
    }

    // ...
}

同样,我写了上面的内容来帮助有需要的人。在网上搜索后,我找不到一个合理而简单的例子。这个确切的主题有一个未解决的问题。网上也有一些例子,但没有一个是容易理解的。Akka 有一些很棒的文档,但是将其与 Play 混合是一项艰巨的脑力任务。

如果您发现任何问题,请帮助改进此答案。

于 2016-10-06T19:22:46.160 回答