我一直在将 ZooKeeper 评估为一个简单的消息队列,并且我编写了两个非常简单的脚本:mq feeder 和 mq consumer。下面的 feeder 仅将 20 个作业推送到队列,然后监视队列状态(正在消耗的作业):
from kazoo.client import KazooClient
zk = KazooClient(hosts='xxx')
zk.start()
for i in xrange(20):
zk.create("/queue/%s" % i, b"%s" % i)
while 1:
print zk.get_children('/queue')
下面的消费者被启动了几次(在我的测试中最多 3 个并发进程),它获取作业列表,遍历它以找到未锁定的作业,处理它(睡眠随机秒数以模拟一些工作)和完成后,删除作业,然后删除锁:
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
from time import sleep
import random
zk = KazooClient(hosts='xxx')
zk.start()
zk.ensure_path("/locks")
zk.ensure_path("/queue")
while 1:
jobs = sorted(zk.get_children('/queue'))
if jobs:
for i in jobs:
print "Checking job: %s" % i
try:
zk.create("/locks/%s" % i)
except NodeExistsError:
print "Job is locked, skipping!"
pass
else:
print "Job is unlocked, processing."
sleep(random.randrange(5))
zk.delete("/queue/%s" % i)
print "Deleted processed job, deleting the lock."
zk.delete("/locks/%s" % i)
pass
else:
print "There's no locks in the queue."
pass
我看到的我无法跟踪的问题是消费者进程正在退出:
Traceback (most recent call last):
File "zk_consumer.py", line 24, in <module>
zk.delete("/queue/%s" % i)
File "/Library/Python/2.7/site-packages/kazoo/client.py", line 1055, in delete
return self.delete_async(path, version).get()
File "/Library/Python/2.7/site-packages/kazoo/handlers/threading.py", line 107, in get
raise self._exception
kazoo.exceptions.NoNodeError: ((), {})
而最后一个进程始终检查单个作业,该作业保留在队列中,但始终处于锁定状态。显然,我在这里有一些逻辑错误,我认为会导致竞争条件,但我已经花了一些时间,但我似乎无法发现它。我在这里做错了什么,还是 ZooKeeper 不是简单作业队列的可行解决方案?