我找到了一个解决方案,可以将任务的参数format_task
作为对象(列表/字典等)而不是截断的字符串表示形式。
方法的argsrepr
andkwargsrepr
参数apply_async()
允许为任务的参数指定自定义表示。
我创建了一个自定义Task类,覆盖了delay
这样的方法:
import json
from celery import Task
class FlowerTask(Task):
def delay(self, *args, **kwargs):
argsrepr, kwargsrepr = [], {}
for arg in args:
if isinstance(arg, bytes):
argsrepr.append("<binary content>")
elif isinstance(arg, list):
argsrepr.append("<list ({} items)>".format(len(arg)))
elif isinstance(arg, dict):
argsrepr.append("<dict ({} keys)>".format(len(arg)))
else:
# Format your args the way you prefer
for key, value in kwargs.items():
# Format your kwargs the same way as above
# if ... :
# kwargsrepr.append(...)
# Create the task
new_task = super().s(*args, **kwargs)
# Use our representations as JSON
return new_task.apply_async(
argsrepr=json.dumps(argsrepr),
kwargsrepr=json.dumps(kwargsrepr)
)
然后我使用这个类作为我的任务的基础,使用base
参数:
@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
return "OK !"
这样,任务的参数表示形式存储为 JSON,之后可以在 Flower 中加载format_task()
并将它们用作对象而不是字符串:
def format_task(task):
argsrepr = json.loads(task.args)
kwargsrepr = json.loads(task.kwargs)
if not argsrepr:
task.args = "( )"
else:
task.args = ', '.join(argsrepr)
if not kwargsrepr:
task.kwargs = "( )"
else:
task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
# [...]
这样,args 显示如下:
参数: <list (3 items)>, <binary content>
夸格斯: callback = <function>, items = <dict (5 items)>