2

我正在尝试用 common lisp 编写一个简单的异步服务器。强调简单。这是Take 2 (感谢 Rainer 的建议和格式化)

(ql:quickload (list :cl-ppcre :usocket))
(defpackage :test-server (:use :cl :cl-ppcre :usocket))
(in-package :test-server)

(defvar *socket-handle* nil)
(defparameter *channel* nil)

(defclass buffer ()
  ((contents :accessor contents :initform nil)
   (started :reader started :initform (get-universal-time))
   (state :accessor state :initform :empty)))

(defun listen-on (port &optional (stream *standard-output*))
  (setf *socket-handle* (socket-listen "127.0.0.1" port :reuse-address t))
  (let ((conns (list *socket-handle*))
        (buffers (make-hash-table)))
    (loop (loop for ready in (wait-for-input conns :ready-only t)
                do (if (typep ready 'stream-server-usocket)
                       (push (socket-accept ready) conns)
                     (let ((buf (gethash ready buffers (make-instance 'buffer))))
                       (buffered-read! (socket-stream ready) buf)
                       (when (starts-with? (list #\newline #\return #\newline #\return)
                                           (contents buf))
                         (format stream "COMPLETE ~s~%"
                                 (coerce (reverse (contents buf)) 'string))
                         (setf conns (remove ready conns))
                         (remhash ready buffers)
                         (let ((parsed (parse buf)))
                           (format stream "PARSED: ~s~%" parsed)
                           (handle-request ready (parse buf))))))))))

(defmethod parse ((buf buffer))
  (let ((lines (split "\\r?\\n" (coerce (reverse (contents buf)) 'string))))
    (second (split " " (first lines)))))

HTTP 写入:

(defmethod http-write (stream (line-end (eql :crlf)))
  (declare (ignore line-end))
  (write-char #\return stream)
  (write-char #\linefeed stream)
  (values))

(defmethod http-write (stream (line string))
  (write-string line stream)
  (http-write stream :crlf)
  (values))

(defmethod http-write (stream (lst list))
  (mapc (lambda (thing) (http-write stream thing)) lst)
  (values))

如何处理请求:

(defmethod handle-request (socket request)
  (let ((s (socket-stream socket)))
    (cond ((string= "/sub" request)
           (subscribe! socket))
          ((string= "/pub" request)
           (publish! "Got a message!")
           (http-write s (list "HTTP/1.1 200 OK"
                               "Content-Type: text/plain; charset=UTF-8"
                               "Cache-Control: no-cache, no-store, must-revalidate"
                               "Content-Length: 10" :crlf
                               "Published!" :crlf))
           (socket-close socket))
          (t (http-write s (list "HTTP/1.1 200 OK" 
                                 "Content-Type: text/plain; charset=UTF-9" 
                                 "Content-Length: 2" :crlf 
                                 "Ok" :crlf))
             (socket-close socket)))))

发布!

(defun publish! (msg)
  (loop for sock in *channel*
     do (handler-case
            (let ((s (socket-stream sock)))
              (format s "data: ~a" msg)
              (http-write s (list :crlf :crlf))
              (force-output s))
          (error (e)
             (declare (ignore e))
             (setf *channel* (remove sock *channel*))))))

订阅!

(defun subscribe! (sock)
  (let ((s (socket-stream sock)))
    (http-write s (list "HTTP/1.1 200 OK" 
                        "Content-Type: text/event-stream; charset=utf-8"
                        "Transfer-Encoding: chunked"
                        "Connection: keep-alive"
                        "Expires: Thu, 01 Jan 1970 00:00:01 GMT"
                        "Cache-Control: no-cache, no-store, must-revalidate" :crlf))
    (force-output s)
    (push sock *channel*)))

基本实用程序:

(defmethod starts-with? ((prefix list) (list list) &optional (test #'eql))
  (loop for (p . rest-p) on prefix for (l . rest-l) on list
     when (or (and rest-p (not rest-l)) (not (funcall test p l))) 
     do (return nil)
     finally (return t)))

(defun stop ()
  (when *socket-handle*
    (loop while (socket-close *socket-handle*))
    (setf *socket-handle* nil
      *channel* nil)))

(defmethod buffered-read! (stream (buffer buffer))
  (loop for char = (read-char-no-hang stream nil :eof)
     until (or (null char) (eql :eof char))
     do (push char (contents buffer))))

总结是:

  1. 它侦听指定端口并将请求数据转储到指定流
  2. 如果它收到对 的请求"/sub",它应该保留该套接字以供进一步写入。
  3. 如果它收到对 的请求"/pub",它应该向所有现有订阅者发送一条短消息
  4. plain-text "Ok"它会根据任何其他请求发回 a 。

像往常一样,欢迎所有反馈。从第 2 版开始(添加了 HTTP 友好的行尾和几个战略性的force-output调用),浏览器似乎对我更满意,但是当消息实际发送到现有通道时,Chrome 仍然会窒息。知道剩下的错误publish!是什么吗?

要清楚,做

var src = new EventSource("/sub");
src.onerror = function (e) { console.log("ERROR", e); };
src.onopen = function (e) { console.log("OPEN", e); };
src.onmessage = function (e) { console.log("MESSAGE", e) };

现在让我在 FireFox 中获得一个工作事件流(它触发onopen,并触发onmessage每次发送的更新)。但在 Chrome 中失败(触发器onopen,每次更新触发onerror而不是onmessage

任何帮助表示赞赏。

4

2 回答 2

2

我要确保的一件事:它应该在输入和输出上正确处理 CRLF。CRLF 用于 HTTP。

有两个 Common Lisp 字符:#\return#\linefeed.

不要使用#\newline. 这是一个特殊字符,取决于操作系统和特定的 CL 实现。在 Unix 操作系统上,它可能与#\linefeed. 在 Windows 实现上,它可能与返回和换行的顺序相同。因此也不要使用换行符作为格式指令~%

始终在 HTTP 协议中显式写入 return 和 newline 作为行结束。因此,您确保您的代码是可移植的并且做正确的事情。

另外,请注意,确保字符的比较不是用EQ. 字符不一定是eq。用于EQL比较身份、数字和字符。

于 2013-09-13T19:00:09.020 回答
1

好的,所以在尝试了很多东西之后,我让它工作了,但我不知道为什么。这将是我的下一个问题。

什么不起作用

  • 改变force-output调用的位置/存在(除非subscribe!publish!消息都是强制的,客户端根本不会触发任何事件)
  • 用于babel在发送之前将 SSE 事件编码为八位字节(这失败了;ssocket-stream不是binary-stream
  • 使用 重写服务器cl-async,它有自己的写例程。这种努力的结果可以在这里看到,但它根本没有帮助。Firefox/Iceweasel/Conkeror 按预期执行,但 Chrom(?:e|ium)仍然以同样的方式失败。也就是说,事件流正常打开,onopen事件触发,但每当发送实际事件时,onerror触发而不是onmessage
  • 按照 SSE 规范的“解析事件流”部分中的bom规定输出。在启动流之前做没有效果。该流仍会被 FF 等人接受,但仍会被 Safari 引擎浏览器拒绝。(write-char (code-char #xfeff) s)

此时唯一剩下的就是破坏数据包嗅探器。使用sniffit,我发现实际上nginx PushStream 模块发出的内容与我的实现发出的内容之间存在差异。

我的(是的,我假装nginx/1.2.0只是为了尽量减少反应之间的差异):

HTTP/1.1 200 OK
Server: nginx/1.2.0
Date: Sun, 15 Oct 2013 10:29:38 GMT-5
Content-Type: text/event-stream; charset=utf-8
Transfer-Encoding: chunked
Connection: keep-alive
Expires: Thu, 01 Jan 1970 00:00:01 GMT
Cache-Control: no-cache, no-store, must-revalidate

data: message goes here

nginx 推流模块:

HTTP/1.1 200 OK
Server: nginx/1.2.0
Date: Sun, 15 Sep 2013 14:40:12 GMT
Content-Type: text/event-stream; charset=utf-8
Connection: close
Expires: Thu, 01 Jan 1970 00:00:01 GMT
Cache-Control: no-cache, no-store, must-revalidate
Transfer-Encoding: chunked

6d
data: message goes here

将“6d”行添加到我的实现中使其正常工作。我不知道为什么,除非这是bom我不熟悉的 UTF-8 中 s 的一些约定。换句话说,重写subscribe!

(defun subscribe! (sock)
  (let ((s (socket-stream sock)))
    (http-write s (list "HTTP/1.1 200 OK" 
                        "Content-Type: text/event-stream; charset=utf-8"
                        "Transfer-Encoding: chunked"
                        "Connection: keep-alive"
                        "Expires: Thu, 01 Jan 1970 00:00:01 GMT"
                        "Cache-Control: no-cache, no-store, must-revalidate" :crlf
                        "6d"))
    (force-output s)
    (push sock *channel*)))

成功了。Chrom(?:e|ium)现在可以正确接受这些事件流,并且不会在发送消息时出错。

现在我需要确切地了解那里到底发生了什么......

于 2013-09-15T15:00:47.400 回答