3

我正在寻找构建实时流媒体服务器的 clojure 方式。我正在努力解决的特定问题是如何将值从单个提供者(网络摄像头)发送到未定义数量的线程(连接的客户端)。显然,当客户端连接时,它对来自网络摄像头的完整视频文件不感兴趣,基本上它需要发送一个标头,然后在那个确切时刻从网络摄像头到达任何包。

在直接的java中,我认为这很容易。每当客户端连接时,将连接添加到阵列,当它断开连接时,从阵列中删除连接,并且每当来自网络摄像头的新包到达时,将其发送到阵列中的每个条目。锁定数组,以便我们添加/删除条目,或者循环通过它来发送数据包。当然,我们可以在 clojure 中构建相同的内容,但这听起来很邪恶。

在消息传递多线程架构中,这听起来同样容易。

在 clojure 中我能想到的唯一解决方案是使用一系列惰性的 promise。确实它有效,但我想知道是否有另一种方法可以导致更清洁的代码和更多的 clojure-zen :)

只是为了说明:一个简化的问题,带有承诺和原子:

一个提供程序函数生成数据,一个线程读取该数据。后来创建了一些其他线程,它们想从第一个线程获取数据,但无法获取。

(defn provider []                                                                                                                                                                                                                                                                                                             
  (lazy-seq                                                                                                                                                                                                                                                                                                                   
    (do                                                                                                                                                                                                                                                                                                                       
      (Thread/sleep 100)                                                                                                                                                                                                                                                                                                      
      (cons (rand) (provider)))))                                                                                                                                                                                                                                                                                             

(def printer (agent nil))                                                                                                                                                                                                                                                                                                     
(defn log [& line]                                                                                                                                                                                                                                                                                                            
  (send-off printer (fn [x] (apply println line))))                                                                                                                                                                                                                                                                           

(def promises (atom (repeatedly promise)))                                                                                                                                                                                                                                                                                    

(defn client-connected-thread [x input]                                                                                                                                                                                                                                                                                       
  (log "Client connection " x " is connected with the provider and just received" @(first input))                                                                                                                                                                                                                             
  (recur x (rest input)))                                                                                                                                                                                                                                                                                                     

(.start (Thread. (fn []                                                                                                                                                                                                                                                                                                       
                   (loop [stream (provider)]                                                                                                                                                                                                                                                                                  
                     (when-let [item (first stream)]                                                                                                                                                                                                                                                                          
                       (log "I received " item", will share now")                                                                                                                                                                                                                                                             
                       (deliver (first @promises) item)                                                                                                                                                                                                                                                                       
                       (swap! promises rest))                                                                                                                                                                                                                                                                                 
                       (recur (rest stream))))))                                                                                                                                                                                                                                                                              


(Thread/sleep 300)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 1 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 100)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 2 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 50)                                                                                                                                                                                                                                                                                                             
(.start (Thread. #(client-connected-thread 3 @promises)))            

所以,基本上问题是:这是解决这个问题的正确方法吗?

还有,我们这里说的是流媒体服务器,所以provider函数每秒会提供几万条,可能有10个客户端连接。承诺系统是否意味着如此大量的使用?

4

2 回答 2

1

Clojure 为您需要异步发送信息的情况提供了代理,这似乎非常适合您的用例。

您确实非常接近,只是将工作代理卡在几个位置以完成此操作。

“在直接的Clojure中,我认为这很容易。每当客户端连接时,将连接添加到代理中的代理向量,当它断开连接时,从代理的代理中删除连接,以及每当来自网络摄像头的新包到达时将其发送给代理中的每个代理。”

确保使用send-off而不是send防止清空线程池。

与“锁定数组”方法相比,这有很多优点:

  • 一个慢速客户端不会阻止您添加或删除或添加连接
  • 客户最终将获得所有帧,而无需单独跟踪每个帧
  • 你不必担心锁定
  • 您不必手动分配线程
  • 您可以使用手表等来报告性能,而无需更改算法的简单核心。

粗略的轮廓如下所示:

user> (def connections-stub (range))
user> (def connections (agent []))
#'user/connections
user> (defn accept-connection [connection] 
    (send connections conj (agent connection)))
#'user/accept-connection
user> (map accept-connection (take 10 connections-stub))
(#<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]>)

user> (defn send-frame [con-agent frame] 
       (send con-agent 
         (fn [connection frame] 
           (println "sending " frame " to " connection) connection) frame))
#'user/send-frame

user> (send-frame (first @connections) "hello")
sending  hello  to  0
#<Agent@da69a9c: 0>

user> (defn dispatch-frame [frame] 
        (doall (map #(send-frame % frame) @connections)))
#'user/dispatch-frame

user> (dispatch-frame "hello")
sending  hello  to  0
sending  hello  to  1
sending  hello  to  2
sending  hello  to  3
sending  hello  to  4
sending  hello  to  5
sending  hello  to  6
sending  hello  to  7
sending  hello  to  8
sending  hello  to  9
(#<Agent@da69a9c: 0> #<Agent@34f07ec4: 1> #<Agent@11ee68d1: 2> #<Agent@3b237a89: 3> #<Agent@1641d6b4: 4> #<Agent@3c76ced6: 5> #<Agent@1c05629d: 6> #<Agent@258d3fca: 7> #<Agent@5c56fa08: 8> #<Agent@52395294: 9>)
user> 
于 2012-03-06T01:10:49.103 回答
1

看看阿莱夫。这是一个提供“异步通道”的库,可以帮助您实现所需的场景。

于 2012-03-06T06:46:25.740 回答