我设法找到了监控工作流程的解决方案。我需要一次计算工作流程。所以,我是通过信号机制做到的。
Huey 提供信号装饰器来触发某个信号,例如SIGNAL_EXECUTING、SIGNAL_COMPLETE、SIGNAL_ERROR。
在每个执行过程中,都会在 Redis 哈希中存储一个具有特定前缀的新键。当该过程结束时,密钥被删除。因此,要获取执行任务的数量,您可以调用get_executing_count()
.
我的工作代码:
from huey import RedisHuey
from huey.constants import EmptyData
from huey.signals import SIGNAL_EXECUTING, SIGNAL_COMPLETE, SIGNAL_ERROR, SIGNAL_REVOKED
EXECUTING_PREFIX = "executing"
redis_addr = os.getenv("REDIS", "localhost")
huey = RedisHuey('entrypoint', host=redis_addr)
def get_executing_count():
global huey
matching = list(
filter(
lambda key: key.decode().startswith(f"{EXECUTING_PREFIX}-"),
huey.storage.conn.hgetall(huey.storage.result_key).keys()
)
)
return len(matching)
@huey.signal(SIGNAL_EXECUTING)
def task_signal_executing(signal, task):
global huey
huey.storage.put_data(f"{EXECUTING_PREFIX}-{task.id}", 1)
@huey.signal(SIGNAL_COMPLETE)
def task_signal_complete(signal, task):
global huey
huey.storage.delete_data(f"{EXECUTING_PREFIX}-{task.id}")
@huey.signal(SIGNAL_ERROR, SIGNAL_REVOKED)
def task_signal_error(signal, task, exc=None):
global huey
huey.storage.delete_data(f"{EXECUTING_PREFIX}-{task.id}")
价值检索我做以下方式
from entrypoint import get_executing_count
get_executing_count()