我正在尝试使用 celery 在一组服务器上安排和运行任务。每个任务都需要长时间运行(几个小时),并且涉及使用子进程来调用具有给定输入的某个程序。该程序在 stdout 和 stderr 中产生大量输出。
有什么方法可以近乎实时地向客户端显示程序产生的输出吗?流式传输输出,以便客户端可以在不登录服务器的情况下观看服务器上运行的任务喷出的输出?
您没有指定许多要求和约束。我假设你已经在某个地方有一个 redis 实例。
你可以做的是逐行读取其他进程的输出,并通过redis发布:
这是一个示例,您可以将echo
数据写入文件/tmp/foo
进行测试:
import redis
redis_instance = redis.Redis()
p = subprocess.Popen(shlex.split("tail -f /tmp/foo"), stdout=subprocess.PIPE)
while True:
line = p.stdout.readline()
if line:
redis_instance.publish('process log', line)
else:
break
在一个单独的过程中:
import redis
redis_instance = redis.Redis()
pubsub = redis_instance.pubsub()
pubsub.subscribe('process log')
while True:
for message in pubsub.listen():
print message # or use websockets to comunicate with a browser
如果您希望该过程结束,您可以在 celery 任务完成后发送“退出”。
您可以使用不同的通道(中的字符串subscribe
)将输出与不同的进程分开。
如果需要,您还可以将日志输出存储在 redis 中,
redis_instance.rpush('process log', message)
然后将其全部检索。
我看到如何做到这一点的一种方法是编写将用于 stderr 和 stdout 的自定义 Logger(请参阅文档:
from celery.app.log import Logger
Logger.redirect_stdouts_to_logger(MyLogger())
您的记录器可以将数据保存到数据库、Memcached、Redis 或您将用于获取数据的任何共享存储中。
我不确定logger的结构,但我想这样的事情会起作用:
from logging import Logger
class MyLogger(Logger):
def log(lvl, msg):
# Do something with the message
这是一个老问题,但它仍然是关于这个特定主题的唯一结果。
下面是我的做法,我创建了一个简单的类文件对象,通过 Redis 发布到特定通道
class RedisFileObject(object):
def __init__(self, _key):
self.connection = redis.Redis()
self.key = _key
self.connection.publish('debug', 'Created channel %s' % self.key)
def write(self, data):
self.connection.publish(self.key, data)
def close(self):
pass
我有一个 BaseTask,我的所有任务都从中继承了各种功能,包括。这个用 Redis 类文件对象替换 stdout 和 stderr 。
def capture_output(self):
sys.stdout = RedisFileObject(self.request.id)
sys.stderr = RedisFileObject(self.request.id)
从那里写入 stdout/stderr 的任何内容都将转发到以任务 ID 命名的 Redis 通道。