5

我正在编写一个使用 Elixir Channels 来处理实时事件的应用程序。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个通道。所以我的应用程序是一个聊天应用程序,其中用户是多个群聊的一部分。我有 1 个名为 MessageChannel 的 Phoenix Channel,其中 join 方法将处理动态主题。

def join("groups:" <> group_id, payload, socket) do
....

假设 John 加入了组/主题 A 和 B,而 Bob 只加入了组/主题 B。当 john 向组/主题 A 发送消息时,broadcast!/3也会将该消息发送给 Bob 是否正确?因为handle_in没有消息发送到哪个主题/组的上下文。

我将如何处理它,以使 Bob 不会收到发送到 A 组的事件。我的设计是否正确?

4

3 回答 3

6

因为handle_in没有消息发送到哪个主题/组的上下文。

Phoenix.Channel.broadcast/3被调用时,显然它确实具有与消息相关联的主题(从签名中并不明显)。你可以看到从这行 channel.ex开始的代码:

def broadcast(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
    Server.broadcast pubsub_server, topic, event, message
end

因此,当broadcast/3使用套接字进行调用时,它会匹配当前主题,然后调用底层的Server.broadcast/4.

(如果你像我一样好奇,这反过来会调用底层PubSub.broadcast/3,它会做一些分发魔法来将调用路由到你配置的 pubsub 实现服务器,很可能使用 pg2 但我离题了......)

Phoenix.Channel所以,我在阅读文档时发现这种行为并不明显,但他们确实在传入事件的 phoenixframework 频道页面中明确说明了这一点:

broadcast!/3将通知所有加入的客户端关于这个套接字的主题并调用他们的handle_out/3回调。

所以它只是在“关于这个套接字的主题”上广播。他们将同一页面上的主题定义为:

topic - 字符串 topic 或 topic:subtopic 对命名空间,例如“messages”、“messages:123”</p>

因此,在您的示例中,“主题”实际上是主题:子主题对名称空间字符串:"groups:A""groups:B"。John 必须在客户端分别订阅这两个主题,因此您实际上会引用两个不同的频道,即使它们使用相同的套接字。因此,假设您使用的是 javascript 客户端,频道创建看起来像这样:

let channelA = this.socket.channel("groups:A", {});
let channelB = this.socket.channel("groups:B", {});

然后,当您从客户端在通道上发送消息时,您只使用具有主题的通道,该主题在服务器上得到模式匹配,如我们上面所见。

channelA.push(msgName, msgBody);
于 2016-12-04T18:01:02.503 回答
3

实际上,套接字路由是根据如何使用channelAPI 在项目 Socket 模块中定义主题来完成的。对于我的 Slack 克隆,我使用三个通道。我有一个系统级通道来处理状态更新、一个用户通道和一个房间通道。

任何给定用户都订阅了 0 或 1 个频道。但是,用户可以订阅多个频道。

对于发送到特定房间的消息,我通过房间频道广播它们。

当我检测到特定房间的未读消息、通知或徽章时,我会使用用户频道。每个用户频道也存储用户订阅的房间列表(它们列在客户端的侧栏上)。

所有这一切的诀窍是使用几个通道 API,主要是intercepthandle_outMy.Endpoint.subscribehandle_info(%Broadcast{},socket)

  • intercept用来捕获我想要忽略或在发送之前操纵的广播消息。
  • 在用户频道中,我订阅了从房间频道广播的事件
  • 当您订阅时,您会收到一个包含广播消息的主题、事件和有效负载的结构的handle_info调用。%Broadcast{}

这是我的几段代码:

defmodule UcxChat.UserSocket do
  use Phoenix.Socket
  alias UcxChat.{User, Repo, MessageService, SideNavService}
  require UcxChat.ChatConstants, as: CC

  ## Channels
  channel CC.chan_room <> "*", UcxChat.RoomChannel    # "ucxchat:"
  channel CC.chan_user <> "*", UcxChat.UserChannel  # "user:"
  channel CC.chan_system <> "*", UcxChat.SystemChannel  # "system:"
  # ...
end

# user_channel.ex
 # ...
 intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
 #...
 def handle_out("room:join", msg, socket) do
    %{room: room} = msg
    UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
    update_rooms_list(socket)
    clear_unreads(room, socket)
    {:noreply, subscribe([room], socket)}
  end
  def handle_out("room:leave" = ev, msg, socket) do
    %{room: room} = msg
    debug ev, msg, "assigns: #{inspect socket.assigns}"
    socket.endpoint.unsubscribe(CC.chan_room <> room)
    update_rooms_list(socket)
    {:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
  end

  # ...
  defp subscribe(channels, socket) do
    # debug inspect(channels), ""
    Enum.reduce channels, socket, fn channel, acc ->
      subscribed = acc.assigns[:subscribed]
      if channel in subscribed do
        acc
      else
        socket.endpoint.subscribe(CC.chan_room <> channel)
        assign(acc, :subscribed, [channel | subscribed])
      end
    end
  end
  # ...
end

我还将 user_channel 用于与特定用户相关的所有事件,例如客户端状态、错误消息等。

于 2017-04-14T03:14:46.247 回答
1

免责声明:我没有查看通道的内部工作原理,这些信息完全来自我在应用程序中使用通道的第一次体验。

当有人加入不同的组时(基于您的模式匹配join/3),将通过单独的通道(套接字)建立连接。因此,向 A 广播不会向 B 的成员发送消息,只会向 A 发送消息。

在我看来, Channel 模块类似于 aGenServer并且 join 有点像start_link,其中启动了一个新服务器(进程)(但是,仅当它不存在时)。

您真的可以忽略模块的内部工作,只需了解如果您加入一个名称与现有名称不同的频道,您将加入一个独特的频道。您也可以相信,如果您向频道广播,则只有该频道的成员才能收到消息。

例如,在我的应用程序中,我有一个用户频道,我希望只连接一个用户。加入看起来就像def join("agent:" <> _agent, payload, socket)agent 只是一个电子邮件地址。当我向该频道广播消息时,只有单个代理接收消息。我还有一个所有座席都加入的办公室频道,当我希望所有座席都收到消息时,我会向它广播。

希望这可以帮助。

于 2016-11-04T15:39:20.863 回答