4

我目前正在尝试为以下问题寻找 lispy/工作解决方案的问题描述:

作业队列提供了一组相等的(通过他们的代码)线程以及他们应该处理的任务。如果队列为空,线程将等到创建新条目,但我也想提供一个干净的关闭。因此,即使在等待队列时,母线程也必须可以设置一些变量/调用线程并告诉它们关闭。他们不直接遵守的唯一原因应该是线程当前正在评估任务,因此在任务完成之前忙/无法彻底关闭。

我目前有两个我不太相信的解决方案:

(defparameter *kill-yourself* nil)

(defparameter *mutex* (sb-thread:make-mutex))

(defparameter *notify* (sb-thread:make-waitqueue))

#|the queue is thread safe|#
(defparameter *job-queue* (make-instance 'queue))


(defun fill-queue (with-data)
   (fill-stuff-in-queue)
   (sb-thread:with-mutex (*mutex*)
     (sb-thread:condition-notify *notify*)))


#|solution A|#
(with-mutex (*mutex*)
  (do ((curr-job nil))
      (*kill-yourself* nil)
    (if (is-empty *job-queue*)
    (sb-thread:condition-wait *notify* *mutex*)
    (progn
      (setf curr-job (dequeue *job-queue*))
      (do-stuff-with-job)))))


#|solution B|#
(defun helper-kill-yourself-p ()
  (sb-thread:with-mutex (*mutex*)
     *kill-yourself*))

(do ((job (dequeue-* *job-queue* :timeout 0) 
      (dequeue-* *job-queue* :timeout 0)))
        ((if (helper-kill-yourself-p)
         t
                 (sb-thread:with-mutex (*mutex*)
                     (sb-thread:condition-wait *notify* *mutex*)
                     (if (helper-kill-yourself-p)
                          t
                          nil)))
         (progn
           nil))
     (do-stuff-with-job))

两个 do-loop 都可以用来启动线程。但是如果有多个线程,A 就不会真正起作用(因为互斥锁会阻止任何并行操作的发生),并且 B 解决方案看起来/很脏,因为可能存在提取的作业为零的情况。此外,我并不真正相信停止条件,因为它太长而且看起来很复杂。

实现(do)循环的正确方法是什么去关机?最后但同样重要的是,必须可以在无限数量的多个并行线程中使用这个(do)循环。

4

2 回答 2

3

解决方案 A

是的,您对解决方案 A 的看法是正确的,互斥锁不会让作业并行执行。

解决方案 B

我认为该do循环不是这项工作的正确工具。特别是,在您的代码中,可能会从队列中提取作业并且线程将退出而不执行它。这种情况是可能的,因为您在应该终止检查之前出列。另外,因为您jobdo's variables 块中定义,您会忽略从 ' 返回的多个值dequeue,这也很糟糕,因为您无法有效地检查队列是否为空。同样在您检查线程是否应该以do结束测试形式停止的情况下,您必须获取*mutex*两次,以检查线程是否应该停止并出队(或者您可以发明奇怪的结束测试形式来完成这项工作循环体)。

因此,话虽如此,您必须将所有代码放入do's body 中,并将 vars 和 end-test 留空。这就是为什么我认为loop在这种情况下更好。

如果您必须使用do循环,您可以轻松地将loopbody 包裹到其中,例如 (do nil (nil nil) *loop-body*).

我的解决方案

(require :sb-concurrency)
(use-package :sb-concurrency)
(use-package :sb-thread)

(defparameter *kill-yourself* nil)
(defparameter *mutex* (make-mutex))
(defparameter *notify* (make-waitqueue))
#|the queue is thread safe|#
(defparameter *job-queue* (make-queue :name "job-queue"))
(defparameter *timeout* 10)
(defparameter *output-lock* (make-mutex))

(defun output (line)
  (with-mutex (*output-lock*)
    (write-line line)))

(defun fill-queue (with-data)
  (enqueue with-data *job-queue*)
  (with-mutex (*mutex*)
    (condition-notify *notify*)))

(defun process-job (thread-name job)
  (funcall job thread-name))

(defun run-worker (name)
  (make-thread
    (lambda ()
      (output (format nil "starting thread ~a" name))
      (loop (with-mutex (*mutex*)
              (condition-wait *notify* *mutex* :timeout *timeout*)
              (when *kill-yourself*
                (output (format nil "~a thread quitting" name))
                (return-from-thread nil)))
            ;; release *mutex* before starting the job,
            ;; otherwise it won't allow other threads wait for new jobs

            ;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
            ;; since inbetween queue can become empty
            (multiple-value-bind (job has-job) (dequeue *job-queue*)
              (if has-job
                (process-job name job)))))
    :name name))

(defun stop-work ()
  (with-mutex (*mutex*)
    (setf *kill-yourself* t)
    (condition-broadcast *notify*)))

(defun add-job (job)
  ;; no need to put enqueue in critical section
  (enqueue job *job-queue*)
  (with-mutex (*mutex*)
    (condition-notify *notify*)))

(defun make-job (n)
  (lambda (thread-name)
    (loop for i upto 1000 collecting i)
    (output (format nil "~a thread executes ~a job" thread-name n))))

(defun try-me ()
  (run-worker "worker1")
  (run-worker "worker2")
  (loop for i upto 1000 do
        (add-job (make-job i)))
  (loop for i upto 2000 collecting i)
  (stop-work))

调用try-meREPL 应该会给你类似下面的输出

starting thread worker1
worker1 thread executes 0 job
worker1 thread executes 1 job
worker1 thread executes 2 job
worker1 thread executes 3 job
starting thread worker2
worker2 thread executes 4 job
worker1 thread executes 5 job
worker2 thread executes 6 job
worker1 thread executes 7 job
worker1 thread executes 8 job
...
worker2 thread executes 33 job
worker1 thread executes 34 job
worker2 thread executes 35 job
worker1 thread executes 36 job
worker1 thread executes 37 job
worker2 thread executes 38 job
0
worker1 thread executes 39 job
worker2 thread quitting
worker1 thread quitting

PS 我无法找到旧 SBCL 的文档,所以我将旧 API 的翻译留给你。希望它会有所帮助。

编辑类解决方案

在对您(已删除)答案的评论中,我们发现您想要一个用于事件循环的类。我想出了以下

(defclass event-loop ()
  ((lock
     :initform (make-mutex))
   (queue
     :initform (make-waitqueue))
   (jobs
     :initform (make-queue))
   (stopped
     :initform nil)
   (timeout
     :initarg :wait-timeout
     :initform 0)
   (process-job
     :initarg :process-job
     :initform #'identity)
   (worker-count
     :initarg :worker-count
     :initform (error "Must supply worker count"))))

(defmethod initialize-instance :after ((eloop event-loop) &key)
  (with-slots (worker-count timeout lock queue jobs process-job stopped) eloop
    (dotimes (i worker-count)
      (make-thread
        (lambda ()
          (loop (with-mutex (lock)
                  (condition-wait queue lock :timeout timeout)
                  (when stopped
                    (return-from-thread nil)))
                ;; release *mutex* before starting the job,
                ;; otherwise it won't allow other threads wait for new jobs

                ;; you don't want to make 2 separate calls (queue-empty-p, dequeue)
                ;; since inbetween queue can become empty
                (multiple-value-bind (job has-job) (dequeue jobs)
                  (if has-job
                    (funcall process-job job)))))))))

(defun push-job (job event-loop )
  (with-slots (lock queue jobs) event-loop
    (enqueue job jobs)
    (with-mutex (lock)
      (condition-notify queue))))

(defun stop-loop (event-loop)
  (with-slots (lock queue stopped) event-loop
    (with-mutex (lock)
      (setf stopped t)
      (condition-broadcast queue))))

你可以像这样使用它

> (defparameter *el* (make-instance 'event-loop :worker-count 10 :process-job #'funcall))
> (defparameter *oq* (make-queue))
> (dotimes (i 100)
    (push-job (let ((n i)) (lambda ()
                             (sleep 1)
                             (enqueue (format nil "~a job done" n) *oq*))) *el*))

sb-thread:queue用作输出以避免奇怪的结果。虽然这有效,但您可以*oq*在您的 REPL 中进行检查。

> *oq*
#S(QUEUE
:HEAD (SB-CONCURRENCY::.DUMMY. "7 job done" "1 job done" "9 job done"
       "6 job done" "2 job done" "11 job done" "10 job done" "16 job done"
       "12 job done" "4 job done" "3 job done" "17 job done" "5 job done"
       "0 job done" "8 job done" "14 job done" "25 job done" "15 job done"
       "21 job done" "28 job done" "13 job done" "23 job done" "22 job done"
       "19 job done" "27 job done" "18 job done")
:TAIL ("18 job done")
:NAME NIL)
于 2013-09-09T18:30:50.287 回答
0

我使用了chanl提供消息队列机制的库。当我想关闭线程时,我只需将关键字发送:stop到队列。当然,这不会:stop在队列中所有前面的事情都完成之前停止。如果您想提前停止,您可以在数据队列之前创建另一个队列(控制队列)。

于 2013-09-09T11:06:38.720 回答