2

语境

我正在开发一个 Mac 应用程序。在这个应用程序中,我想运行一个 websocket 服务器。为此,我使用了 Swift NIO 和 Websocket-Kit。我的完整设置如下。

问题

Websocket-Kit 和 SwiftNIO 的所有文档都旨在创建一个服务器端进程,该进程在您从命令行启动时启动,然后无限运行。

在我的应用程序中,我必须能够启动 websocket 服务器,然后将其关闭并按需重新启动,而无需重新启动我的应用程序。下面的代码可以做到这一点,但我想确认两件事:

  1. test()函数中,我向所有连接的客户端发送一些文本。我不确定这是否是线程安全的和正确的。我可以WebSocket像在这里一样存储实例并从应用程序的主线程向它们发送消息吗?

  2. 我是否正确关闭了 websocket 服务器?调用的结果是serverBootstrap(group:)[...].bind(host:port:).wait()创建一个Channel然后无限等待。当我调用shutdownGracefully()关联EventLoopGroup时,该服务器是否已正确清理?(我可以确认关闭后端口 5759 再次空闲,所以我一切都清理干净了?)

感谢您的输入;很难找到在应用程序中使用 SwiftNIO 和 Websocket-Kit 的示例。

代码

import Foundation
import NIO
import NIOHTTP1
import NIOWebSocket
import WebSocketKit


@objc class WebsocketServer: NSObject
{
    private var queue: DispatchQueue?
    private var eventLoopGroup: MultiThreadedEventLoopGroup?
    private var websocketClients: [WebSocket] = []
    
    
    @objc func startServer()
    {
        queue = DispatchQueue.init(label: "socketServer")
        queue?.async
        {
            let upgradePipelineHandler: (Channel, HTTPRequestHead) -> EventLoopFuture<Void> = { channel, req in
                
                WebSocket.server(on: channel) { ws in
                    ws.send("You have connected to WebSocket")
                    
                    DispatchQueue.main.async {
                        self.websocketClients.append(ws)
                        print("websocketClients after connection: \(self.websocketClients)")
                    }
                
                    ws.onText { ws, string in
                        print("received")
                        ws.send(string.trimmingCharacters(in: .whitespacesAndNewlines).reversed())
                    }
                
                    ws.onBinary { ws, buffer in
                        print(buffer)
                    }
                
                    ws.onClose.whenSuccess { value in
                        print("onClose")
                        
                        DispatchQueue.main.async
                        {
                            self.websocketClients.removeAll { (socketToTest) -> Bool in
                                return socketToTest === ws
                            }
                            
                            print("websocketClients after close: \(self.websocketClients)")
                        }
                    }
                }
            }

            self.eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
            let port: Int = 5759

            let promise = self.eventLoopGroup!.next().makePromise(of: String.self)
            
            let server = try? ServerBootstrap(group: self.eventLoopGroup!)
                
                // Specify backlog and enable SO_REUSEADDR for the server itself
                .serverChannelOption(ChannelOptions.backlog, value: 256)
                .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
                
                .childChannelInitializer { channel in
              
                let webSocket = NIOWebSocketServerUpgrader(
                    shouldUpgrade: { channel, req in
                        return channel.eventLoop.makeSucceededFuture([:])
                    },
                    upgradePipelineHandler: upgradePipelineHandler
                )
              
                return channel.pipeline.configureHTTPServerPipeline(
                    withServerUpgrade: (
                        upgraders: [webSocket],
                        completionHandler: { ctx in
                            // complete
                        })
                )
            }.bind(host: "0.0.0.0", port: port).wait()                  

            _ = try! promise.futureResult.wait()
        }
    }
    
    
    
    ///
    ///  Send a message to connected clients, then shut down the server.
    ///
    @objc func test()
    {
        self.websocketClients.forEach { (ws) in
            ws.eventLoop.execute {
                ws.send("This is a message being sent to all websockets.")
            }
        }
        
        stopServer()
    }
    
    
    
    @objc func stopServer()
    {
        self.websocketClients.forEach { (ws) in
            try? ws.eventLoop.submit { () -> Void in
                print("closing websocket: \(ws)")
                _ = ws.close()
            }.wait()                        // Block until complete so we don't shut down the eventLoop before all clients get closed.
        }
        
        eventLoopGroup?.shutdownGracefully(queue: .main, { (error: Error?) in

            print("Eventloop shutdown now complete.")
            self.eventLoopGroup = nil
            self.queue = nil
        })
    }
}
4

1 回答 1

2

在 test() 函数中,我向所有连接的客户端发送了一些文本。我不确定这是否是线程安全的和正确的。我可以像在这里一样存储 WebSocket 实例并从应用程序的主线程向它们发送消息吗?

就像你在这里做的那样,是的,那应该是安全的。ws.eventLoop.execute将在属于该 WebSocket 连接的事件循环线程上执行该块。这将是安全的。

当我在关联的 EventLoopGroup 上调用 shutdownGracefully() 时,该服务器是否已正确清理?(我可以确认关闭后端口 5759 再次空闲,所以我猜一切都清理干净了?)

是的。shutdownGracefully强制关闭所有连接和侦听套接字。

于 2020-11-16T10:24:46.273 回答