0

我正在启动数百个并发http-kit.client/get请求,并提供回调以将结果写入单个文件。

什么是处理线程安全的好方法?使用chan<!!core.asyc

这是我会考虑的代码:

(defn launch-async [channel url]                                                                                                                                
  (http/get url {:timeout 5000                                                                                                                                  
                 :user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}                                              
          (fn [{:keys [status headers body error]}]                                                                                                             
            (if error                                                                                                                                           
              (put! channel (json/generate-string {:url url :headers headers :status status}))                                                                  
              (put! channel (json/generate-string body))))))                                                                                                    

(defn process-async [channel func]                                                                                                                              
  (when-let [response (<!! channel)]                                                                                                                            
    (func response)))                                                                                                                                           

(defn http-gets-async [func urls]                                                                                                                               
  (let [channel (chan)]                                                                                                                                         
    (doall (map #(launch-async channel %) urls))                                                                                                                
    (process-async channel func)))    

感谢您的见解。

4

2 回答 2

3

由于您已经在示例中使用了 core.async,我想我会指出一些问题以及如何解决这些问题。另一个答案提到使用更基本的方法,我完全同意更简单的方法就可以了。但是,使用通道,您可以使用一种简单的方法来使用不涉及映射到向量上的数据,如果您有很多响应,向量也会随着时间的推移而变大。考虑以下问题以及我们如何解决它们:

(1) 如果您的 url 列表包含超过 1024 个元素,您当前的版本将崩溃。有一个用于异步的 put 和 take 的内部缓冲区(即,put!take!阻塞但总是立即返回),限制为 1024。这是为了防止通道的无限制异步使用。要亲自查看,请致电(http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com"))

您要做的是仅在有空间时才在频道上放一些东西。这称为背压。从go block best practice上的优秀 wiki 中获取一个页面,从 http-kit 回调中执行此操作的一个聪明方法是使用put!回调选项启动下一个 http get;这只会在put!立即成功时发生,因此您永远不会出现超出通道缓冲区的情况:

(defn launch-async
  [channel [url & urls]]
  (when url
    (http/get url {:timeout 5000
                   :user-agent "Mozilla"}
              (fn [{:keys [status headers body error]}]
                (let [put-on-chan (if error
                                    (json/generate-string {:url url :headers headers :status status})
                                    (json/generate-string body))]
                  (put! channel put-on-chan (fn [_] (launch-async channel urls))))))))

(2) 接下来,您似乎只处理一个响应。相反,使用 go-loop:

(defn process-async
  [channel func]
  (go-loop []
    (when-let [response (<! channel)]
      (func response)
      (recur))))

(3) 这是你的http-gets-async功能。我认为在这里添加缓冲区没有什么害处,因为它应该可以帮助您在开始时触发大量请求:

(defn http-gets-async
  [func urls]
  (let [channel (chan 1000)]
    (launch-async channel urls)
    (process-async channel func)))

现在,您可以使用背压处理无限数量的 url。要对此进行测试,请定义一个计数器,然后让您的处理函数增加此计数器以查看您的进度。使用易于访问的 localhost URL(不建议向 Google 等发起数十万个请求):

(def responses (atom 0))
(http-gets-async (fn [_] (swap! responses inc))
                 (repeat 1000000 "http://localhost:8000"))

因为这都是异步的,所以你的函数会立即返回,你可以看看@responses增长。

您可以做的另一件有趣的事情是process-async,您可以选择将其用作通道本身的转换器,而不是在 中运行您的处理功能。

(defn process-async
  [channel]
  (go-loop []
    (when-let [_ (<! channel)]
      (recur))))

(defn http-gets-async
  [func urls]
  (let [channel (chan 10000 (map func))] ;; <-- transducer on channel
    (launch-async channel urls)
    (process-async channel)))

有很多方法可以做到这一点,包括构建它以使通道关闭(请注意,在上面,它保持打开状态)。java.util.concurrent如果您愿意,您可以在这方面提供帮助,而且它们非常易于使用。可能性非常多。

于 2017-02-21T19:03:45.027 回答
1

这很简单,我不会使用 core.async 。您可以使用原子存储使用响应向量来执行此操作,然后有一个单独的线程读取原子的内容,直到它看到所有响应。然后,在您的 http-kit 回调中,您可以直接swap!响应到 atom 中。

如果您确实想使用 core.async,我建议使用缓冲通道以防止阻塞您的 http-kit 线程池。

于 2017-02-21T17:01:17.023 回答