7

目前,我正在尝试使用 RESTful API 构建一个 Web 服务,以处理一些长时间运行的任务(作业)。

这个想法是用户通过执行 POST 提交作业,该 POST 返回一些用于检查作业状态的 URL,其中还包含结果的 url。一旦作业完成(即某些值被写入数据库),结果 URL 将返回适当的信息(而不是没有结果)并且作业 url 将指示完成状态。

不幸的是,计算非常密集,因此一次只能运行一个,因此需要对作业进行排队。

在伪中需要这样的东西

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {}))

(defn schedule-job [params] 
  ;; schedules the job into the queue and 
  ;; adds the job to a jobs map for checking status via GET
  ;; note that the job should not  be evaluated until popped from the queue
)

(POST "/analyze" [{params :params}] 
 (schedulde-job params))

(GET "job/:id" [:d] 
 (get @jobs id))

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete
;; Note: should not terminate when queue is empty! 

我研究了允许异步处理的Lamina,但它似乎不适合我的需要。

我的问题是如何使作业队列出队并在前一个任务完成后执行其任务,而不在队列为空时终止,即永久处理传入的作业。

4

3 回答 3

9

java.util.concurrent.ExecutorService 可能是您想要的。这允许您提交作业以供以后执行,并返回一个 Future,您可以查询以发现它是否已完成。

(import '[java.util.concurrent Callable Executors])

(def job-executor
  (Executors/newSingleThreadExecutor))

(def jobs (atom {}))

(defn submit-job [func]
  (let [job-id   (str (java.util.UUID/randomUUID))
        callable (reify Callable (call [_] (func))]
    (swap! jobs assoc job-id (.submit job-executor callable))
    job-id))

(use 'compojure.core)

(defroutes app
  (POST "/jobs" [& params]
    (let [id (submit-job #(analyze params))]
      {:status 201 :headers {"Location" (str "/jobs/" id)}}))
  (GET "/jobs/:id" [id]
    (let [job-future (@jobs id)]
      (if (.isDone job-future)
        (.get job-future)
        {:status 404}))))
于 2013-02-03T20:12:11.397 回答
2

这似乎符合我的预期,但它似乎相当不习惯。有人对如何改善这一点有想法吗?

;; Create a unique identifier
(defn uuid [] (str (java.util.UUID/randomUUID)))

;; Create a job-queue and a map for keeping track of the status
(def job-queue (ref clojure.lang.PersistentQueue/EMPTY))
(def jobs (atom {}))

(defn dequeue! [queue-ref]
  ;; Pops the first element off the queue-ref
  (dosync 
    (let [item (peek @queue-ref)]
      (alter queue-ref pop)
      item)))

(defn schedule-job! [task] 
  ;; Schedule a task to be executed, expects a function (task) to be evaluated
  (let [uuid (uuid)
        job (delay task)]
    (dosync 
      (swap! jobs assoc uuid job) 
      (alter job-queue conj job))))

(defn run-jobs []
  ;; Runs the jobs 
  (while true
    (Thread/sleep 10)
    (let [curr (dequeue! job-queue)] 
      (if-not (nil? curr) (@curr)))))

(.start (Thread. run-jobs))
于 2013-02-03T16:52:34.823 回答
0

您的描述似乎是一个多生产者和单一消费者的场景。下面是一个示例代码(您可以将其与 REST 的东西和可能的一些异常处理挂钩,这样代理就不会死机)

(def worker (agent {}))                                                                                                                              

(defn do-task [name func]                                                                                                                            
  (send worker                                                                                                                                       
        (fn [results]                                                                                                                                 
          (let [r (func)]                                                                                                                            
            (assoc results name r)))))

;submit tasks                                                                                                               
(do-task "uuid1" #(print 10))                                                                                                                        
(do-task "uuid2" #(+ 1 1))

;get all results
(print @worker) 
于 2013-02-03T17:26:44.023 回答