我正在开发一个将从网络读取数据的驱动程序。它不知道响应中有多少,除了当它尝试读取并返回 0 字节时,它就完成了。所以我的阻塞 Swift 代码看起来很天真:

func readAllBlocking() -> [Byte] {

  var buffer: [Byte] = []
  var fullBuffer: [Byte] = []

  repeat {
    buffer = read() // synchronous, blocking
  } while buffer.count > 0

  return fullBuffer


func readAllNonBlocking() -> EventLoopFuture<[Byte]> {


我应该补充一点,我可以重写 read() 而不是返回 [Byte] 返回 EventLoopF​​uture<[Byte]>


func readAllNonBlocking(on eventLoop: EventLoop) -> EventLoopFuture<[Byte]> {
    // The accumulated chunks
    var accumulatedChunks: [Byte] = []

    // The promise that will hold the overall result
    let promise = eventLoop.makePromise(of: [Byte].self)

    // We turn the loop into recursion:
    func loop() {
        // First, we call `read` to read in the next chunk and hop
        // over to `eventLoop` so we can safely write to `accumulatedChunks`
        // without a lock.
        read().hop(to: eventLoop).map { nextChunk in
            // Next, we just append the chunk to the accumulation
            accumulatedChunks.append(contentsOf: nextChunk)
            guard nextChunk.count > 0 else {
            // and if it wasn't empty, we loop again.
        }.cascadeFailure(to: promise) // if anything goes wrong, we fail the whole thing.

    loop() // Let's kick everything off.

    return promise.futureResult


首先,您在上面实现的是简单地读取所有内容,直到您看到 EOF,如果该软件暴露在互联网上,您绝对应该添加一个最大字节数限制在内存中。

其次,SwiftNIO 是一个事件驱动的系统,所以如果你用 SwiftNIO 读取这些字节,程序实际上看起来会略有不同。如果您对在 SwiftNIO 中简单地累积所有字节直到 EOF 的样子感兴趣,那就是:

struct AccumulateUntilEOF: ByteToMessageDecoder {
    typealias InboundOut = ByteBuffer

    func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
        // `decode` will be called if new data is coming in.
        // We simply return `.needMoreData` because always need more data because our message end is EOF.
        // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
        // data to decode a message.
        return .needMoreData

    func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
        // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
        // this is because of EOF or an error.
        if seenEOF {
            // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
            // the pipeline.
        } else {
            // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
            // will now contain what we received so far but maybe we should just drop it on the floor.
        return .needMoreData

如果你想用 SwiftNIO 制作一个完整的程序,这里有一个例子,它是一个服务器,它接受所有数据,直到它看到 EOF,然后实际上只是写回接收到的字节数:)。当然,在现实世界中,您永远不会保留所有接收到的字节来计算它们(您可以只添加每个单独的部分),但我想它可以作为示例。

import NIO

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
    try! group.syncShutdownGracefully()

struct AccumulateUntilEOF: ByteToMessageDecoder {
    typealias InboundOut = ByteBuffer

    func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
        // `decode` will be called if new data is coming in.
        // We simply return `.needMoreData` because always need more data because our message end is EOF.
        // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
        // data to decode a message.
        return .needMoreData

    func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
        // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
        // this is because of EOF or an error.
        if seenEOF {
            // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
            // the pipeline.
        } else {
            // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
            // will now contain what we received so far but maybe we should just drop it on the floor.
        return .needMoreData

// Just an example "business logic" handler. It will wait for one message
// and just write back the length.
final class SendBackLengthOfFirstInput: ChannelInboundHandler {
    typealias InboundIn = ByteBuffer
    typealias OutboundOut = ByteBuffer

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        // Once we receive the message, we allocate a response buffer and just write the length of the received
        // message in there. We then also close the channel.
        let allData = self.unwrapInboundIn(data)
        var response = context.channel.allocator.buffer(capacity: 10)
        context.writeAndFlush(self.wrapOutboundOut(response)).flatMap {
            context.close(mode: .output)
        }.whenSuccess {
            context.close(promise: nil)

    func errorCaught(context: ChannelHandlerContext, error: Error) {
        print("ERROR: \(error)")
        context.channel.close(promise: nil)

let server = try ServerBootstrap(group: group)
    // Allow us to reuse the port after the process quits.
    .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
    // We should allow half-closure because we want to write back after having received an EOF on the input
    .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
    // Our program consists of two parts:
    .childChannelInitializer { channel in
            // 1: The accumulate everything until EOF handler
                                 // We want 1 MB of buffering max. If you remove this parameter, it'll also
                                 // buffer indefinitely.
                                 maximumBufferSize: 1024 * 1024),
            // 2: Our "business logic"
    // Let's bind port 9999
    .bind(to: SocketAddress(ipAddress: "", port: 9999))

// This will never return.
try server.closeFuture.wait()


$ echo -n "hello world" | nc localhost 9999
