语境
我正在开发一个 Mac 应用程序。在这个应用程序中,我想运行一个 websocket 服务器。为此,我使用了 Swift NIO 和 Websocket-Kit。我的完整设置如下。
问题
Websocket-Kit 和 SwiftNIO 的所有文档都旨在创建一个服务器端进程,该进程在您从命令行启动时启动,然后无限运行。
在我的应用程序中,我必须能够启动 websocket 服务器,然后将其关闭并按需重新启动,而无需重新启动我的应用程序。下面的代码可以做到这一点,但我想确认两件事:
在
test()
函数中,我向所有连接的客户端发送一些文本。我不确定这是否是线程安全的和正确的。我可以WebSocket
像在这里一样存储实例并从应用程序的主线程向它们发送消息吗?我是否正确关闭了 websocket 服务器?调用的结果是
serverBootstrap(group:)[...].bind(host:port:).wait()
创建一个Channel
然后无限等待。当我调用shutdownGracefully()
关联EventLoopGroup
时,该服务器是否已正确清理?(我可以确认关闭后端口 5759 再次空闲,所以我猜一切都清理干净了?)
感谢您的输入;很难找到在应用程序中使用 SwiftNIO 和 Websocket-Kit 的示例。
代码
import Foundation
import NIO
import NIOHTTP1
import NIOWebSocket
import WebSocketKit
@objc class WebsocketServer: NSObject
{
private var queue: DispatchQueue?
private var eventLoopGroup: MultiThreadedEventLoopGroup?
private var websocketClients: [WebSocket] = []
@objc func startServer()
{
queue = DispatchQueue.init(label: "socketServer")
queue?.async
{
let upgradePipelineHandler: (Channel, HTTPRequestHead) -> EventLoopFuture<Void> = { channel, req in
WebSocket.server(on: channel) { ws in
ws.send("You have connected to WebSocket")
DispatchQueue.main.async {
self.websocketClients.append(ws)
print("websocketClients after connection: \(self.websocketClients)")
}
ws.onText { ws, string in
print("received")
ws.send(string.trimmingCharacters(in: .whitespacesAndNewlines).reversed())
}
ws.onBinary { ws, buffer in
print(buffer)
}
ws.onClose.whenSuccess { value in
print("onClose")
DispatchQueue.main.async
{
self.websocketClients.removeAll { (socketToTest) -> Bool in
return socketToTest === ws
}
print("websocketClients after close: \(self.websocketClients)")
}
}
}
}
self.eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
let port: Int = 5759
let promise = self.eventLoopGroup!.next().makePromise(of: String.self)
let server = try? ServerBootstrap(group: self.eventLoopGroup!)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelInitializer { channel in
let webSocket = NIOWebSocketServerUpgrader(
shouldUpgrade: { channel, req in
return channel.eventLoop.makeSucceededFuture([:])
},
upgradePipelineHandler: upgradePipelineHandler
)
return channel.pipeline.configureHTTPServerPipeline(
withServerUpgrade: (
upgraders: [webSocket],
completionHandler: { ctx in
// complete
})
)
}.bind(host: "0.0.0.0", port: port).wait()
_ = try! promise.futureResult.wait()
}
}
///
/// Send a message to connected clients, then shut down the server.
///
@objc func test()
{
self.websocketClients.forEach { (ws) in
ws.eventLoop.execute {
ws.send("This is a message being sent to all websockets.")
}
}
stopServer()
}
@objc func stopServer()
{
self.websocketClients.forEach { (ws) in
try? ws.eventLoop.submit { () -> Void in
print("closing websocket: \(ws)")
_ = ws.close()
}.wait() // Block until complete so we don't shut down the eventLoop before all clients get closed.
}
eventLoopGroup?.shutdownGracefully(queue: .main, { (error: Error?) in
print("Eventloop shutdown now complete.")
self.eventLoopGroup = nil
self.queue = nil
})
}
}