0

我想运行类似的代码

(->> input
     (partition-all 5)
     (map a-side-effect)
     dorun)

异步划分输入和输出(副作用)。

然后我编写了代码来进行下面的实验。

;; using boot-clj
(set-env! :dependencies '[[org.clojure/core.async "0.2.374"]])
(require '[clojure.core.async :as async :refer [<! <!! >! >!!]])

(let [input (range 18)
      c (async/chan 1 (comp (partition-all 5)
                            (map prn)))]
  (async/onto-chan c input false)
  (async/close! c))

这段代码的解释:

  • 实际上 input 中的元素及其数量在运行之前并没有定义,并且 input 中的元素可以取 0 到 10 之间的一些数字。
  • async/onto-chan用于将元素的 Seq(输入片段)放入通道中c,并将被多次调用,因此第三个参数是false.
  • prn是 的替代品a-side-effect

我期望上面的代码打印

[0 1 2 3 4]
[5 6 7 8 9]
[10 11 12 13 14]
[15 16 17]

但是在 REPL 中它不打印任何字符。

然后我添加一个等待的时间,就像这样

(let [c (async/chan 1 (comp (partition-all 5)
                            (map prn)))]
  (async/onto-chan c (range 18) false)
  (Thread/sleep 1000) ;wait
  (async/close! c))

这段代码在上面给出了我的预期输出。

然后我检查core.async/onto-chan

我认为发生了什么:

  1. 该频道ccore.async/close!在我的代码中编辑。
  2. 因为通道被关闭了,所以参数的每一项都core.async/onto-chancore.async/>!徒劳地放在了go-loopin中。onto-chanc

close!是否有确定的方法可以在ing之前放置项目?onto-chan写一个不使用的同步版本go-loop

还是我的想法错了?

4

2 回答 2

1

您的第二个示例Thread.sleep仅错误地“起作用”。

c它起作用的原因是从' 转换器出来的每个转换结果值都是nil,并且由于nil通道中不允许 s ,因此会引发异常,并且不会将值放入c:这就是允许生产者onto-chan继续放入的原因通道而不是阻塞等待。如果将第二个示例粘贴到 REPL 中,您将看到四个堆栈跟踪——每个分区一个。

s 当然是由于nil映射过prn,这是一个副作用函数,返回nil所有输入。

如果我正确理解您的设计,您的目标是执行以下操作:

(defn go-run! [ch proc]
  (async/go-loop []
    (when-let [value (<! ch)]
      (proc value)
      (recur))))

(let [input (range 18)
      c (async/chan 1 (partition-all 5))]
  (async/onto-chan c input)
  (<!! (go-run! c prn)))
  • 你确实需要一个生产者和一个消费者,否则你的程序会阻塞。我介绍了一个go-loop消费者。
  • 非常笼统地说,map副作用不能很好地结合在一起,所以我将副作用提取prn到了消费者身上。
  • onto-chan不能被称为“多次”(至少在显示的代码中)所以它不需要false参数。
于 2016-05-07T08:28:56.860 回答
0

采用 megakorre 的想法:

(let [c (async/chan 1 (comp (partition-all 5)
                            (map prn)))
      put-ch (async/onto-chan c (range 18) false)]
  (async/alts!! [put-ch])
  (async/close! c))
于 2016-05-06T15:04:12.877 回答