我是编程新手,也是 Common Lisp 的新手。我正在尝试解决以下问题:
有流源(S1、S2、S3)、两个处理器(P1、P2)(处理器我不是指 CPU 处理器,而是处理功能/子系统)和四个流目的地(D1、D2、D3、 D4)。通过流,我的意思是数据连续出现,但可能是间歇性的。所以读/写操作可能会阻塞。来自流的数据需要聚合,然后根据它们是什么,由其中一个处理器处理。每个处理器(从其输入聚合)产生四种子聚合,然后将它们发送到上述四个目的地之一。
我有这个问题的伪代码是这样的:
三个循环,S1、S2、S3 各一个,用于数据聚合和调度。对于特定的源(比如 S2)伪代码是:
loop-2 : read from S2,
when sufficient data aggregate,
if aggregate is kind-1 send to P1
else send to P2.
两个循环,一个用于 P1、P2,用于处理和解聚合。对于特定的处理器(比如 P1)伪代码是:
In a loop for P1: take aggregate of kind-1,
process it to produce one or some of four sub-aggregates.
case: sub-aggregate 1 -> serialize and send/write to D1,
sub-aggregate 2 -> serialize and send/write to D2,
sub-aggregate 3 -> serialize and send/write to D3,
sub-aggregate 4 -> serialize and send/write to D4.
在处理器和目的地之间,我想使用另一个通道/队列,但没有提到让我的问题对自己来说更简单。
据我了解,大约有十个阻塞点。三,如果数据源中不可用(S1、S2、S3);当处理器(P1,P2)的数据不足时,三个,当目标(D1,D2,D3,D4)没有准备好写入时,四个。
所以我的问题是:如何同时运行所有这些循环(同时是这个词?)而不会相互阻塞;以及程序中正在做的其他事情。
先前的工作:我已经查看了这个SO 问题,来自cl-async 的轮询器和通知器。查看Cliki-Concurrency,我还看到了threading-queue。更不用说cl-flow和green-threads 了。CL-flow 提到它是“Common Lisp 中异步非阻塞并发的库”。有没有办法,我可以使用带有 cl-async 或 cl-flow 的队列库?例如,flow-concurrently似乎是正确的。但是它可以与 cl-async 的绿色线程一起使用来运行非终止循环吗?
我的难点:我试图阅读我上面提到的每个链接并尽可能多地理解。但是,老实说,我什至无法弄清楚从哪里/如何开始。谢谢你的帮助。
编辑:2020 年 2 月 28 日。
我尝试像这样使用 cl-flow。初始化流系统后
(ql:quickload "simple-flow-dispatcher")
(ql:quickload "cl-muth")
(ql:quickload "cl-flow")
(ql:quickload "bt-semaphore")
(use-package :cl-flow :bt-semaphore)
(defvar *output* *standard-output*)
(defun print-error (e)
(format *output* "~A" e))
(defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher
:threads 4
:error-handler #'print-error))
(defun run-flow (flow)
(run *dispatcher* flow))
我使用了以下功能:
(defun hello-world (n)
(sleep n)
(format t (concatenate 'string (gap n) "Loop-" (write-to-string n) "~%")))
(defun gap(n)
(if (= n 0)
""
(concatenate 'string " " (gap (- n 1)))))
(defun loop-hello (n)
(loop
(hello-world n)))
(defun test-flow ()
(run-flow
(flow:concurrently
(flow:asynchronously
(loop-hello 3))
(flow:asynchronously
(loop-hello 2))
(flow:asynchronously
(loop-hello 1)))))
hello-world 是一个简单的函数,我加入了函数 gap 来在文本前插入一些空格。正如所言,我期待三列交错的打印行(同时,异步等),但我只得到一个(第一个循环-hello)列。
我目前的理解如下,请指正:循环和睡眠都是同步语句,所以执行被阻塞。所以我搜索了循环和睡眠的异步版本。我想知道我是否可以使用 cl-async 来做到这一点。
与此同时,我试图简化我的问题。昨天,我也偶然发现了这个问题及其答案。我试图理解它。
此外,由于我使用伪代码发布了我的原始问题,我想知道我希望(比如说)一个库为我提供功能(即使它现在不存在)。
我尝试在 google 中搜索异步代码,大多数结果显示 javascript 库。到目前为止,我只能模糊地掌握两个。Python,Clojure(Java)。写一个从 q1 异步读取的小函数,处理读取 v1 到 v2 的值,然后写入 q2(q1 和 q2 是队列)。对我来说,似乎我可以这样写(说):
async def p1(x):
#compute v2 from v1 and return v2
async def keep_doing_p1(q1, q2):
while True:
v1 = await q1.pop()
v2 = p1(v1) # function p1 defined above
await q2.push(v2)
当然,必须有合适的 q1 和 q2 实现。在 Clojure(Java) 中,适当地使用 core.async 库:
(def q1 (async/chan))
(def q2 (async/chan))
(defn p1 [v1]
;;; Need a transducer version
;;; compute v2 from v1 and return)
(async/go
(-->
(async/< q1)
(p1)
(async/q2)))
我上面发布的代码可能是错误的。我是根据我(很可能)肤浅的理解写的。我没有 Python、Clojure(Java) 经验。这些例子也不是煽动任何火焰战争。我认为所有语言都可以很好、强大、方便,并迎合不同的人类程序员。我想解决 CL 中的问题。我是一个资深的人(按年龄),但一个初级的,实际上是一个新手(在编程方面)。我真的很喜欢用 CL 写它。并不是说我已经或即将成为一个伟大的程序员。但是有了CL,我觉得,天啊,我连程序都可以写了!
不管怎样,所以回到问题上来。所以我想要一个库来提供如下语法/功能:
(cc-forever q1 q2 p1
(push q2 (p1 (pop q1))))
;;; many other such cc-forever, operating concurrently
你可能已经猜到了,cc-forever 的意思是(ConCurrently-run FOREVER)。
所以我正在寻找制作这样一个函数/库的帮助。如果我的思维过程仍然很复杂,请提出更好/更简单的替代方案。谢谢。
稍后将再次发布我的尝试。
编辑:2020 年 5 月 27 日
我将继续探索 cl-async 和 cl-flow。我最近遇到了在 cl-async 之上实现的 with-green-thread 宏。作为一个新手,我面临着双重问题:这些库使用不同的语法,并且可能使用它在语义上做不同的事情。目前,我对 with-green-thread 和 cl-flow: 反复感到鼓舞。尽管有时代码会引发错误(可能是我对某些 quicklisp 问题或其他问题的处理不当)。会继续发帖。如果有任何反馈,将很有帮助。