我有一些关于使用 Python 和 Redis 创建作业队列应用程序以运行异步命令的一般性问题。这是我到目前为止生成的代码:
def queueCmd(cmd):
r_server.rpush("cmds", cmd)
def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)
def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1
def boom(info):
print "pop goes the weasel"
if __name__ == '__main__':
r_server = redis.Redis("localhost")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
printCmdQueue()
pool = Pool(processes=2)
print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break
printCmdQueue()
首先,我是否认为如果我需要与经理进行任何沟通,我想通过回调来做到这一点?我在此使用中看到的快速示例将异步调用存储在结果中并通过 result.get(timeout=1) 访问它。通过交流,我的意思是把东西放回redis列表。
编辑:如果命令以异步方式运行并且我在 main 内的结果超时,那是工人超时还是只是管理器内的操作超时?如果只有经理我不能用它来检查工人的退出代码吗?
接下来,此代码产生以下输出:
['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]
为什么即使我一次弹出一个命令,工作人员也想一次使用多个命令?在类似的情况下,这并不总是很好地结束,有时需要 ctrl+c。为了对付他,我清理了队列,然后再去。我认为这与 apply_sync() 以及是否退出循环有关。我想知道是否需要在工人方面发生更多事情?
如果我将 ifs 更改为注释掉的 ifs,我会得到:
ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'
这似乎是检查是否需要中断的更好方法,但似乎函数有时会返回字符串文字?
任何有关改进这一点的建议将不胜感激。我只是想创建一个类似于 linux 机器上的服务/守护进程的管理器。它将用于从 redis 列表中获取作业(当前是命令,但可能更多)并将结果返回到 redis 列表中。然后,GUI 将与此管理器交互以获取队列状态并返回结果。
谢谢,
编辑:
我意识到我有点傻。我不需要从工作人员访问 redis 服务器,这会导致一些错误(特别是 ValueError)。
为了解决这个问题,现在的循环是:
while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])
在这些行之后,我调用pool.close()
. 我用os.getpid()
andos.getppid()
来检查我确实有多个孩子跑来跑去。
如果这听起来像是创建使用 redis 的经理/工作人员应用程序的好方法,我仍然会很高兴听到。