我正在寻找构建实时流媒体服务器的 clojure 方式。我正在努力解决的特定问题是如何将值从单个提供者(网络摄像头)发送到未定义数量的线程(连接的客户端)。显然,当客户端连接时,它对来自网络摄像头的完整视频文件不感兴趣,基本上它需要发送一个标头,然后在那个确切时刻从网络摄像头到达任何包。
在直接的java中,我认为这很容易。每当客户端连接时,将连接添加到阵列,当它断开连接时,从阵列中删除连接,并且每当来自网络摄像头的新包到达时,将其发送到阵列中的每个条目。锁定数组,以便我们添加/删除条目,或者循环通过它来发送数据包。当然,我们可以在 clojure 中构建相同的内容,但这听起来很邪恶。
在消息传递多线程架构中,这听起来同样容易。
在 clojure 中我能想到的唯一解决方案是使用一系列惰性的 promise。确实它有效,但我想知道是否有另一种方法可以导致更清洁的代码和更多的 clojure-zen :)
只是为了说明:一个简化的问题,带有承诺和原子:
一个提供程序函数生成数据,一个线程读取该数据。后来创建了一些其他线程,它们想从第一个线程获取数据,但无法获取。
(defn provider []
(lazy-seq
(do
(Thread/sleep 100)
(cons (rand) (provider)))))
(def printer (agent nil))
(defn log [& line]
(send-off printer (fn [x] (apply println line))))
(def promises (atom (repeatedly promise)))
(defn client-connected-thread [x input]
(log "Client connection " x " is connected with the provider and just received" @(first input))
(recur x (rest input)))
(.start (Thread. (fn []
(loop [stream (provider)]
(when-let [item (first stream)]
(log "I received " item", will share now")
(deliver (first @promises) item)
(swap! promises rest))
(recur (rest stream))))))
(Thread/sleep 300)
(.start (Thread. #(client-connected-thread 1 @promises)))
(Thread/sleep 100)
(.start (Thread. #(client-connected-thread 2 @promises)))
(Thread/sleep 50)
(.start (Thread. #(client-connected-thread 3 @promises)))
所以,基本上问题是:这是解决这个问题的正确方法吗?
还有,我们这里说的是流媒体服务器,所以provider函数每秒会提供几万条,可能有10个客户端连接。承诺系统是否意味着如此大量的使用?