3

I'm writing a stateful server in Clojure backed by Neo4j that can serve socket requests, like HTTP. Which means, of course, that I need to be able to start and stop socket servers from within this server. Design-wise, I would want to be able to declare a "service" within this server and start and stop it.

What I'm trying to wrap my mind around in Clojure is how to ensure that starting and stopping these services is thread-safe. This server I'm writing will have NREPL embedded inside it and process incoming requests in a parallel way. Some of these requests will be administrative: start service X, stop service Y. Which opens up the possibility that two start requests come in at the same time.

  1. Starting should synchronously check a "running" flag and a "starting" flag and fail if either are set. In the same transaction, the "starting" flag should be set.
  2. After the "starting" flag is set, the transaction closes. That makes the "starting" flag visible to other transactions.
  3. Then the (start) function actually starts the service.
  4. If (start) succeeds, the "running" and "starting" flags are synchronously set.
  5. If (start) fails, the "starting" flag is set and the exception is returned.

Stopping needs the same thing, checking a "running" flag and checking and setting it's own "stopping" flag.

I'm trying to reason through all possible combinations of (start) and (stop).

Have I missed anything?

Is there a library for this already? If not, what should a library like this look like? I'll open source it and put it on Github.

Edit:

This is what I have so far. There's a hole I can see though. What am I missing?

(ns extenium.db
  (:require [clojure.tools.logging :as log])
  (:import org.neo4j.graphdb.factory.GraphDatabaseFactory))

(def ^:private
  db- (ref {:ref nil
            :running false
            :starting false
            :stopping false}))

(defn stop []
  (dosync
   (if (or (not (:running (ensure db-)))
           (:stopping (ensure db-)))
     (throw (IllegalStateException. "Database already stopped or stopping."))
     (alter db- assoc :stopping true)))
  (try
    (log/info "Stopping database")
    (.shutdown (:ref db-))
    (dosync
     (alter db- assoc :ref nil))
    (log/info "Stopped database")
    (finally
      (dosync
       (alter db- assoc :stopping false)))))

In the try block, I log, then call .shutdown, then log again. If the first log fails (I/O exceptions can happen), then (:stopping db-) is set to false, which unblocks it and is fine. .shutdown is a void function from Neo4j, so I don't have to evaluate a return value. If it fails, (:stopping db-) is set to false, so that's fine too. Then I set the (:ref db-) to nil. What if that fails? (:stopping db-) is set to false, but the (:ref db-) is left hanging. So that's a hole. Same case with the second log call. What am I missing?

Would this be better if I just used Clojure's locking primitives instead of a ref dance?

4

2 回答 2

3

这实际上很适合简单的锁:

(locking x
  (do-stuff))

x是要同步的对象。

详细说明:启动和停止服务是副作用;副作用不应从事务内部启动,除非可能作为代理操作。在这里,虽然锁正是设计所要求的。请注意,当它们非常适合手头的问题时,在 Clojure 中使用它们并没有错,实际上我想说locking的是这里的规范解决方案。(参见 Stuart Halloway 的Lancet,在Programming Clojure(第 1 版)中介绍了一个使用锁的 Clojure 库示例,该库已广泛使用,主要在 Leiningen 中。)

更新:添加快速失败行为:

这仍然非常适合锁,即java.util.concurrent.locks.ReentrantLock(Javadoc 的后续链接):

(import java.util.concurrent.locks.ReentrantLock)

(def lock (ReentrantLock.))

(defn start []
  (if (.tryLock lock)
    (try
      (do-stuff)
      (finally (.unlock lock)))
    (do-other-stuff)))

(do-stuff)获取锁成功后执行;否则,(do-other-stuff)会发生。在任何一种情况下,当前线程都不会阻塞。

于 2013-04-21T13:45:39.173 回答
1

这听起来像是代理的一个很好的用例,它们允许您将更改序列化为一个可变状态,Clojure 代理文档有一个很好的概述。您可以使用错误处理程序和代理错误方法来处理异常,而无需担心锁定或竞争条件。

(def service (agent {:status :stopped}))

(defn start-service [{:keys [status] :as curr}]
  (if (= :stopped status)
    (do 
      (println "starting service")
      {:status :started})
    (do 
      (println "service already running")
      curr)))

 ;; start the service like this
 (send-off service start-service)

 ;; gets the current status of the service
 @service
于 2013-04-22T01:24:55.380 回答