10

我有一个输入流,我想HTTPS在将结果传递到程序的另一部分之前为每个请求发出 2 个网络请求。典型的吞吐量是每秒 50 个。

for each input:
    HTTP request A
    HTTP request B
    pass event on with (A.body and B.body)

我正在使用http-kit客户端,默认情况下是异步的。它返回一个承诺,也可以接受一个回调。Http-kit 使用 Java NIO(参见此处此处

请求进来的速度,加上发出请求的时间,足够高以至于需要异步完成。

我尝试了 3 种方法:

  1. 当一个事件进来时,把它放在一个频道上。一些go例程拉出通道。deref每个请求都通过从 HTTP 请求中获取承诺来“阻止”goblock 。这是行不通的,因为我认为诺言不能很好地与线程配合使用。
  2. 当一个事件进来时,立即启动一个future'blocks' 异步承诺。这会导致非常高的 CPU 使用率。加上不知何故网络资源匮乏。
  3. 当有事件进来时,http-kit立即触发对请求 A 的请求,传入一个产生请求 B 的回调,传递一个传递事件的回调。这会导致几个小时后出现内存不足错误。

这些都可以工作并处理一段时间的容量。他们最终都会崩溃。最近一次崩溃,大约 12 小时后:

Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending
 tasks!
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status:
        Managed Threads: 3
        Active Threads: 1
        Active Tasks:
                com.mchange.v2.resourcepool.BasicResourcePool$1DestroyResourceTask@65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0)
        Pending Tasks:
                com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@359acb0d
Pool thread stack traces:
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main]
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560)
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main]
                java.lang.Object.wait(Native Method)
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main]
                java.lang.Object.wait(Native Method)
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)


Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen
java.lang.OutOfMemoryError: Java heap space
        at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77)
        at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76)
        at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65)
        at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63)
        at sun.security.ssl.Handshaker.activate(Handshaker.java:514)
        at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717)
        at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743)
        at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310)
        at org.httpkit.client.HttpClient.run(HttpClient.java:375)
        at java.lang.Thread.run(Thread.java:745)
Mar 10, 2016 4:56:34 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 5:00:43 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 4:58:25 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space

我不知道失败的原因是什么。可能是有太多的闭包被持有,或者逐渐的资源泄漏,或者线程饥饿。

问题

  1. 每秒发出 50 个 HTTP 请求,每个请求可能需要 200 毫秒,这意味着在任何给定时间可能有 100 个请求在进行中,这听起来像是一个过度的负担吗?

  2. 如何以处理吞吐量且稳健的方式执行此操作?

编辑

YourKit 分析器告诉我,我有大约 2GB 的char[]s via org.httpkit.client.Handlers via java.util.concurrent.FutureTasks,这表明对旧处理程序(即请求)的引用以某种方式被保留。尝试使用回调的全部原因是为了避免这种情况(尽管它们可能会以某种方式陷入闭包)

4

2 回答 2

2
  1. 每秒发出 50 个 HTTP 请求,每个请求可能需要 200 毫秒,这意味着在任何给定时间可能有 100 个请求在进行中,这听起来像是一个过度的负担吗?

这在现代硬件上绝对不过分。

  1. 如何以处理吞吐量且稳健的方式执行此操作?

您可以结合 core.async 管道和 http-kit 的回调来实现这一点。您实际上不需要go为每个请求创建一个例程(尽管这不应该受到伤害),因为您可以使用put!http-kit 回调中的 async 。

为管道的每个步骤使用有界缓冲区以限制活动连接的数量,这将(至少)受到系统上可用的临时 TCP 端口数量的限制。

这是一个与您描述的类似的小程序示例。它从通道读取“事件”——在这种情况下,每个事件的 ID 为“1”——并在 HTTP 服务上查找这些 ID。它从第一次调用中获取响应,查找 JSON 键"next"并将其作为第 2 步的 URL 排队。最后,当此查找完成时,它会向例程监控的out通道添加一个事件以报告统计信息。go

(ns concur-req.core
  (require [clojure.core.async :as async]
           [cheshire.core :refer [decode]]
           [org.httpkit.client :as http]))

(defn url-of
  [id]
  ;; this service responds within 100-200ms
  (str "http://localhost:28080/" id ".json"))

(defn retrieve-json-async
  [url c]
  (http/get url nil
            (fn [{body :body status :status :as resp}]
              (if (= 200 status)
                (async/put! c (decode body true))
                (println "ERROR:" resp))
              (async/close! c))))

(defn run [parallelism stop-chan]
  (let [;; allocate half of the parallelism to each step
        step1-n    (int (max (/ parallelism 2) 1))
        step2-n    step1-n
        ;; buffer to take ids, transform them into urls
        step1-chan (async/chan step1-n (map url-of))
        ;; buffer for result of pulling urls from step1, xform by extracting :next url
        step2-chan (async/chan step2-n (map :next))
        ;; buffer to count completed results
        out-chan   (async/chan 1 (map (constantly 1)))
        ;; for delivering the final result
        final-chan (async/chan)
        start-time (System/currentTimeMillis)]

    ;; process URLs from step1 and put the result in step2
    (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan)
    ;; process URLs from step2 and put the result in out
    (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan)

    ;; keep the input channel full until stop-chan is closed.
    (async/go-loop []
      (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])]
        (if (= c stop-chan)
          (async/close! step1-chan)
          (recur))))

    ;; count messages on out-chan until the pipeline is closed, printing
    ;; status message every second
    (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0]
      (let [[v c] (async/alts! [status-timer out-chan])]
        (cond (= c status-timer)
              (do (println subt "records...")
                  (recur (async/timeout 1000) 0 (+ subt accu)))

              (nil? v)
              (async/>! final-chan (+ subt accu))

              :else
              (recur status-timer (+ v subt) accu))))

    ;; block until done, then emit final report.
    (let [final-total (async/<!! final-chan)
          elapsed-ms  (- (System/currentTimeMillis) start-time)
          elapsed-s   (/ elapsed-ms 1000.0)]
      (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n"
                     final-total parallelism elapsed-s
                     (int (/ final-total elapsed-s)))))))

(defn run-for
  [seconds parallelism]
  (let [stop-chan (async/chan)]
    (future
      (Thread/sleep (* seconds 1000))
      (async/close! stop-chan))
    (run parallelism stop-chan)))

(do
  ;; Warm up the connection pool, avoid somaxconn problems...
  (doseq [p (map #(* 20 (inc %)) (range 25))]
    (run-for 1 p))
  (run-for (* 60 60 6) 500))

为了测试这一点,我设置了一个 HTTP 服务,该服务仅在休眠 100-200 毫秒之间的随机时间后才响应。然后我在我的 Macbook Pro 上运行了这个程序 6 个小时。

将并行度设置为 500,我平均每秒完成 1155 个事务(每秒完成 2310 个 HTTP 请求)。我敢肯定,通过一些调整(尤其是通过将 HTTP 服务移动到另一台机器上),这可能会更高。JVM 内存在前 30 分钟内攀升至 1.5 GB,然后保持该大小。我正在使用 Oracle 的 64 位 1.8 JVM。

于 2017-10-09T13:54:41.913 回答
1

您的方法 A 的替代方法(deref在 go-block 中使用 HTTP-kit 返回的期货)可能是一种可能性,只是这样做不会导致将来阻塞 core.async 处理程序线程,您可以通过结合 httpkit 的回调和 core.async 来实现:

(defn handle-event
 "Return a core.async channel that will contain the result of making both HTTP call A and B."
  [event-data]
  (let [event-a-chan (clojure.core.async/chan)
        event-b-chan (clojure.core.async/chan)
        return-chan (clojure.core.async/chan)]
    (org.httpkit.client/request "https://event-a-call"
                                {:method :get :params {"param1-k" "param1-v"}}
                                (fn [resp]
                                  (clojure.core.async/put! event-a-chan resp)))
    (org.httpkit.client/request "https://event-b-call"
                                {:method :get :params {"param1-k" "param1-v"}}
                                (fn [resp]
                                  (clojure.core.async/put! event-b-chan resp)))
    (clojure.core.async/go
      (clojure.core.async/>! return-chan {:event-a-response (clojure.core.async/<! event-a-chan)
                                          :event-b-response (clojure.core.async/<! event-b-chan)}))
    return-chan))
于 2016-03-17T00:15:25.543 回答