0

我有一个 Swift NIO HTTP2 服务器,它在上下文的事件循环中处理请求。但我想在另一个线程中处理请求,GCD aync 线程池并取回结果并发送它。

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
    context.eventLoop.execute {
       context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in 
            // ...
            var buffer = context.channel.allocator.buffer(capacity: respBody.count)
            buffer.writeString(respBody)        
            context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
            return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
        }.whenComplete { _ in
            context.close(promise: nil)
        }
    }
}

如果我将其更改为使用 GCD 全局队列,我将如何返回EventLoopFuture<Void>响应?

context.eventLoop.execute {
    context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
        DispatchQueue.global().async {
            return self.send("hello world new ok", to: context.channel).whenComplete({ _ in
                _ = context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
                context.close(promise: nil)
            })
        }
    }
}

以这种方式使用 GCD 全局队列是否可以,或者我将如何使用工作线程?


发送字符串函数调用下面的函数来编写正文。

private func sendData(_ data: Data, to channel: Channel, context: StreamContext) -> EventLoopFuture<Void> {
    let headers = self.getHeaders(contentLength: data.count, context: context)
    _ = self.sendHeader(status: .ok, headers: headers, to: channel, context: context)
    var buffer = channel.allocator.buffer(capacity: data.count)
    buffer.writeBytes(data)
    let part = HTTPServerResponsePart.body(.byteBuffer(buffer))
    return channel.writeAndFlush(part)
}
4

1 回答 1

3

SwiftNIO 中的规则是:

  • Channels 的操作是线程安全的,因此您可以从任何线程或队列中执行它们
  • ChannelHandlerContext上的操作不是线程安全的,只能在ChannelHandler. 的所有ChannelHandler事件都在右侧调用EventLoop

所以你的例子几乎是正确的,只要确保只使用Channel来自ChannelHandlerContextaDispatchQueue或任何其他线程(不是通道的EventLoop)的,而不是使用。

let channel = context.channel // save on the EventLoop
channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
    DispatchQueue.global().async {
        self.send("hello world new ok", to: channel).flatMap {
            channel.writeAndFlush(HTTPServerResponsePart.end(nil))
        }.whenComplete {
            channel.close(promise: nil)
        }
    }
}

我在这里做了一个假设,即self.send可以从任何线程调用并且不使用ChannelHandlerContext您可能存储在self. 要评估self.send这里是否可以,我需要知道它到底做了什么。


顺便说一句,在您的第一个代码片段中,您有一个多余的eventloop.execute

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
   // eventLoop.execute not necessary here
   context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in 
        // ...
        var buffer = context.channel.allocator.buffer(capacity: respBody.count)
        buffer.writeString(respBody)        
        context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
        return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
    }.whenComplete { _ in
        context.close(promise: nil)
    }
}

是不必要的,因为 a上context.eventLoop.execute的任何事件总是在正确的.ChannelHandlerEventLoop

于 2019-08-20T20:48:24.093 回答