4

我一直在尝试使用 Crystal 和 Kemal 创建一个非阻塞服务器,它将 (a) 侦听发送给它的 UDP 消息流,然后 (b) 将该消息转发到 WebSocket 到任何已经启动的浏览器ws 连接。

到目前为止,我能做到的最好的是:

require "kemal"
require "socket"

server = UDPSocket.new
server.bind "localhost", 1234
puts "Started..."

ws "/" do |socket|

    udp_working = true

    while udp_working
        message, client_addr = server.receive
        socket.send message
    end

    socket.on_close do
        puts "Goodbye..."
        udp_working = false
    end
end

这一切似乎有点不雅,而且确实没有按预期工作,因为:

  • 在正在启动的 Crystal 服务器和连接到 Crystal 服务器的第一个 Web 浏览器之间发送的所有 UDP 数据包都被缓存并在一个巨大的积压中发送
  • 与 WebSockets 断开连接的浏览器没有得到正确处理,即 socket.on_close 没有被触发,并且循环继续直到我终止 Crystal 服务器

我希望有一个 server.on_message 类型的处理,它可以让我只在收到 UDP 数据包时运行代码,而不是持续轮询阻塞服务器。有没有另一种方法可以使用 Crystal/Kemal 实现这一目标?

谢谢!

4

2 回答 2

3

您的方法有几个问题:

首先,socket.on_close不能工作,因为这条线永远不会到达。while 循环将一直运行,udp_working == true并且只会falseon_close钩子中设置。

如果您不希望 UDP 数据报堆积,则需要从一开始就接收它们,并在没有连接 websocket 的情况下做任何您想做的事情(也许处置?)。没有on_message钩子,UDPServerreceive已经是非阻塞的。所以你可以在一个循环中运行它(在它自己的光纤中)并在方法返回时采取行动。有关详细信息,请参阅Crystal 并发;还有一个使用 TCPSocket 的例子,但是 UDP 在这方面应该是类似的。

于 2017-10-18T08:50:27.513 回答
3

Crystal 为您处理非阻塞,您需要在单独的光纤中编写阻塞代码并使用通道进行通信。Crystal 将在幕后使用非阻塞代码和 select() 调用。

此外,您需要一个框架或一些您自己的代码来将收到的消息复制到每个侦听的 websocket。这通常称为发布/订阅或发布/订阅。


…</p>

ws "/" do |socket|

   udp_working = true

   while udp_working
       message, client_addr = server.receive
       socket.send message
   end

   socket.on_close do
       puts "Goodbye..."
       udp_working = false
   end
end

即 socket.on_close 没有被触发,

您想先运行socket.on_close,否则不会添加事件处理程序,因此直到循环之后才会运行,但循环实际上是无限的。此外,socket.send message如果在检查udp_workingvar 和调用#send

于 2017-10-18T08:54:27.293 回答