1

否则说,我想依靠epoll(或类似的)编写看起来像不依赖回调的常规代码的异步网络代码。

代码必须看起来像同步代码,但不像同步代码而不是阻塞等待网络 io,它必须暂停当前协程并在文件描述符准备好时重新启动它。

4

2 回答 2

1

我最初的想法是依靠生成器和yield. 但这是一个错误,部分原因是 python 曾经滥用yield from.

无论如何,guile 纤维是一个很好的灵感,我将它改编为 chez 方案

这是一个示例服务器代码:

(define (handler request port)
  (values 200 #f (http-get "https://httpbin.davecheney.com/ip")))

(untangle (lambda ()
            (run-server "127.0.0.1" 8888)))

根据handlerhttpbin 服务返回其 IP。在 call/cc 实际上是 call/1cc 的帮助下,代码看起来是同步的。

untangle将使用作为参数传递的 lambda 启动事件循环!

这是 的定义run-server

(define (run-server ip port handler)
  (log 'info "HTTP server running at ~a:~a" ip port)
  (let* ((sock (socket 'inet 'stream 'ipv4)))
    (socket:setsockopt sock 1 2 1) ;; re-use address
    (socket:bind sock (make-address ip port))
    (socket:listen sock 1024)
    (let loop ()
      (let ((client (accept sock)))
        (let ((port (fd->port client)))
          (spawn (lambda () (run-once handler port)))
          (loop))))))

如您所见,没有回调。唯一与简单的同步网络服务器有所不同的是spawn在它自己的协程中处理请求的过程。特别accept是异步的。

run-once只会将方案请求传递给handler并取其 3 个值来构建响应。不是很有趣。上面看起来是同步的,但实际上是异步的部分http-get

我只会解释,接受是如何工作的,因为 http-get 需要引入自定义二进制端口,但我只想说这是相同的行为......

(define (accept fd)
  (let ((out (socket:%accept fd 0 0)))
    (if (= out -1)
        (let ((code (socket:errno)))
          (if (= code EWOULDBLOCK)
              (begin
                (abort-to-prompt fd 'read)
                (accept fd))
              (error 'accept (socket:strerror code))))
        out)))

如您所见,它调用了一个abort-to-prompt我们可以简单调用的过程,pause它将“停止”协程并调用提示处理程序。

abort-to-prompt与 合作call-with-prompt

由于 chez 方案没有提示,因此我使用两个一次性延续来模拟它call/1cc

(define %prompt #f)
(define %abort (list 'abort))

(define (call-with-prompt thunk handler)
  (call-with-values (lambda ()
                      (call/1cc
                       (lambda (k)
                         (set! %prompt k)
                         (thunk))))
    (lambda out
      (cond
       ((and (pair? out) (eq? (car out) %abort))
        (apply handler (cdr out)))
       (else (apply values out))))))

(define (abort-to-prompt . args)
  (call/1cc
   (lambda (k)
     (let ((prompt %prompt))
       (set! %prompt #f)
       (apply prompt (cons %abort (cons k args)))))))

call-with-prompt将启动一个set!全局调用的延续%prompt,这意味着THUNK. 如果 continuation 参数OUT,即 的第二个 lambda call-with-values,以唯一对象开头,%abort则表示通过 continuation 到达abort-to-prompt。它将HANDLER使用abort-to-prompt延续和传递给call-with-prompt延续参数的任何参数调用(apply handler (cons k (cdr out))).

abort-to-promp在代码执行存储在%prompt.

call-with-prompt是事件循环的核心。这是它,分为两部分:

(define (exec epoll thunk waiting)
  (call-with-prompt
   thunk
   (lambda (k fd mode) ;; k is abort-to-prompt continuation that
                       ;; will allow to restart the coroutine

     ;; add fd to the correct epoll set
     (case mode
       ((write) (epoll-wait-write epoll fd))
       ((read) (epoll-wait-read epoll fd))
       (else (error 'untangle "mode not supported" mode)))
     (scheme:hash-table-set! waiting fd (make-event k mode)))))

(define (event-loop-run-once epoll waiting)
  ;; execute every callback waiting in queue, 
  ;; call the above exec procedure 
  (let loop ()
    (unless (null? %queue)
      ;; XXX: This is done like that because, exec might spawn
      ;; new coroutine, so we need to cut %queue right now. 
      (let ((head (car %queue))
            (tail (cdr %queue)))
        (set! %queue tail)
        (exec epoll head waiting)
        (loop))))

    ;; wait for ONE event
    (let ((fd (epoll-wait-one epoll (inf))
      (let ((event (scheme:hash-table-ref waiting fd)))
        ;; the event is / will be processed, no need to keep around
        (scheme:hash-table-delete! waiting fd)
        (case (event-mode event)
          ((write) (epoll-ctl epoll 2 fd (make-epoll-event-out fd)))
          ((read) (epoll-ctl epoll 2 fd (make-epoll-event-in fd))))
        ;; here it will schedule the event continuation that is the
        ;; abort-to-prompt continuation that will be executed by the
        ;; next call the above event loop event-loop-run-once
        (spawn (event-continuation event))))))

我认为仅此而已。

于 2019-05-26T21:00:59.843 回答
1

如果您使用的是 chez-scheme,则有chez-a-sync。它使用 POSIX poll 而不是 epoll(epoll 是 linux 特有的)。guile-a-sync2 也可用于 guile-2.2/3.0。

于 2019-06-13T18:45:16.340 回答