
我有一个谜,会很感激你的帮助。这个谜团来自observer我在 Clojure 中编写的 rxjava 的行为,它来自于observable从在线示例中抄录的几个简单的 s。


另一个 observable 通过 Clojure 在另一个线程上异步执行相同的操作future。完全相同的观察者不会捕获发布到其的所有事件onNext;它似乎只是在尾部丢失了随机数量的消息。

promise在donCompleted的等待到期和发送到agent收集器的所有事件的等待到期之间存在以下故意竞争。如果获胜promise,我希望false在. 如果获胜,我希望看到来自's 队列的所有消息。我不期望的一个结果是AND来自. 但是,墨菲不睡觉,而这正是我所看到的。我不知道垃圾收集是否有问题,或者 Clojure 的 STM 内部排队,或者我的愚蠢,或者其他什么。onCompletedagentagenttrueonCompletedagenttrueonCompletedagent

我在这里按照自包含形式的顺序呈现源代码,以便它可以直接通过lein repl. 有三个仪式要避免:首先,leiningen 项目文件,project.clj它声明了0.9.0对 Netflix 的 rxjava 版本的依赖:

现在,命名空间和 Clojure 需求以及 Java 导入:

(ns expt2.core
  (:require clojure.pprint)
  (:refer-clojure :exclude [distinct])
  (:import [rx Observable subscriptions.Subscriptions]))


(defmacro pdump [x]
  `(let [x# ~x]
     (do (println "----------------")
         (clojure.pprint/pprint '~x)
         (println "~~>")
         (clojure.pprint/pprint x#)
         (println "----------------")

最后,给我的观察者。我使用 anagent来收集任何 observable 发送的消息onNext。我使用 anatom来收集潜在的onError. 我使用 apromise以便onCompleted观察者外部的消费者可以等待它。

(defn- subscribe-collectors [obl]
  (let [;; Keep a sequence of all values sent:
        onNextCollector      (agent [])
        ;; Only need one value if the observable errors out:
        onErrorCollector     (atom nil)
        ;; Use a promise for 'completed' so we can wait for it on
        ;; another thread:
        onCompletedCollector (promise)]
    (letfn [;; When observable sends a value, relay it to our agent"
            (collect-next      [item] (send onNextCollector (fn [state] (conj state item))))
            ;; If observable errors out, just set our exception;
            (collect-error     [excp] (reset!  onErrorCollector     excp))
            ;; When observable completes, deliver on the promise:
            (collect-completed [    ] (deliver onCompletedCollector true))
            ;; In all cases, report out the back end with this:
            (report-collectors [    ]
               ;; Wait for everything that has been sent to the agent
               ;; to drain (presumably internal message queues):
               {:onNext      (do (await-for 1000 onNextCollector)
                                 ;; Then produce the results:
                ;; If we ever saw an error, here it is:
                :onError     @onErrorCollector
                ;; Wait at most 1 second for the promise to complete;
                ;; if it does not complete, then produce 'false'.
                ;; I expect if this times out before the agent
                ;; times out to see an 'onCompleted' of 'false'.
                :onCompleted (deref onCompletedCollector 1000 false)
      ;; Recognize that the observable 'obl' may run on another thread:
      (-> obl
          (.subscribe collect-next collect-error collect-completed))
      ;; Therefore, produce results that wait, with timeouts, on both
      ;; the completion event and on the draining of the (presumed)
      ;; message queue to the agent.

现在,这是一个同步的 observable。它将 25 条消息onNext送入观察者的喉咙,然后调用他们onCompleted的 s。

(defn- customObservableBlocking []
    (fn [observer]                       ; This is the 'subscribe' method.
      ;; Send 25 strings to the observer's onNext:
      (doseq [x (range 25)]
        (-> observer (.onNext (str "SynchedValue_" x))))
      ; After sending all values, complete the sequence:
      (-> observer .onCompleted)
      ; return a NoOpSubsription since this blocks and thus
      ; can't be unsubscribed (disposed):

我们订阅我们的观察者这个 observable:

;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)


{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
 :onError nil,
 :onCompleted true}

这是一个异步 observable,它做的事情完全相同,只是在 afuture的线程上:

(defn- customObservableNonBlocking []
    (fn [observer]                       ; This is the 'subscribe' method
      (let [f (future
                ;; On another thread, send 25 strings:
                (doseq [x (range 25)]
                  (-> observer (.onNext (str "AsynchValue_" x))))
                ; After sending all values, complete the sequence:
                (-> observer .onCompleted))]
        ; Return a disposable (unsubscribe) that cancels the future:
        (Subscriptions/create #(future-cancel f))))))

;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)

但是,令人惊讶的是,这是我们在控制台上看到的:truefor onCompleted,暗示promiseDID NOT TIME-OUT; 但只有一些异步消息。我们看到的实际消息数量因运行而异,这意味着存在一些并发现象。线索赞赏。

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
 :onError nil,
 :onCompleted true}

await-foron 代理意味着阻塞当前线程,直到到目前为止(从该线程或代理)发送到代理的所有操作都发生了,这意味着在您的等待结束后可能还有其他线程可以发送消息到代理,这就是你的情况。在您对代理的等待结束并且您:onNext在地图中的键中取消了它的值后,然后您等待完成的承诺,在等待之后结果证明是真的,但同时其他一些消息被发送到代理被收集到向量中。

您可以通过将:onCompleted键作为映射中的第一个键来解决此问题,这基本上意味着等待完成,然后等待代理,因为到那时,send在已经收到 onCompleted 之后,就不会再发生对代理的调用了。

{:onCompleted (deref onCompletedCollector 1000 false)
 :onNext      (do (await-for 0 onNextCollector)
 :onError     @onErrorCollector
