实际上,套接字路由是根据如何使用channel
API 在项目 Socket 模块中定义主题来完成的。对于我的 Slack 克隆,我使用三个通道。我有一个系统级通道来处理状态更新、一个用户通道和一个房间通道。
任何给定用户都订阅了 0 或 1 个频道。但是,用户可以订阅多个频道。
对于发送到特定房间的消息,我通过房间频道广播它们。
当我检测到特定房间的未读消息、通知或徽章时,我会使用用户频道。每个用户频道也存储用户订阅的房间列表(它们列在客户端的侧栏上)。
所有这一切的诀窍是使用几个通道 API,主要是intercept
、handle_out
、My.Endpoint.subscribe
和handle_info(%Broadcast{},socket)
。
- 我
intercept
用来捕获我想要忽略或在发送之前操纵的广播消息。
- 在用户频道中,我订阅了从房间频道广播的事件
- 当您订阅时,您会收到一个包含广播消息的主题、事件和有效负载的结构的
handle_info
调用。%Broadcast{}
这是我的几段代码:
defmodule UcxChat.UserSocket do
use Phoenix.Socket
alias UcxChat.{User, Repo, MessageService, SideNavService}
require UcxChat.ChatConstants, as: CC
## Channels
channel CC.chan_room <> "*", UcxChat.RoomChannel # "ucxchat:"
channel CC.chan_user <> "*", UcxChat.UserChannel # "user:"
channel CC.chan_system <> "*", UcxChat.SystemChannel # "system:"
# ...
end
# user_channel.ex
# ...
intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
#...
def handle_out("room:join", msg, socket) do
%{room: room} = msg
UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
update_rooms_list(socket)
clear_unreads(room, socket)
{:noreply, subscribe([room], socket)}
end
def handle_out("room:leave" = ev, msg, socket) do
%{room: room} = msg
debug ev, msg, "assigns: #{inspect socket.assigns}"
socket.endpoint.unsubscribe(CC.chan_room <> room)
update_rooms_list(socket)
{:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
end
# ...
defp subscribe(channels, socket) do
# debug inspect(channels), ""
Enum.reduce channels, socket, fn channel, acc ->
subscribed = acc.assigns[:subscribed]
if channel in subscribed do
acc
else
socket.endpoint.subscribe(CC.chan_room <> channel)
assign(acc, :subscribed, [channel | subscribed])
end
end
end
# ...
end
我还将 user_channel 用于与特定用户相关的所有事件,例如客户端状态、错误消息等。