1

我正在使用负载平衡视图提交任务。

我希望能够从不同的客户端连接并通过提交的函数和参数查看剩余的任务。

例如:

def someFunc(parm1, parm2):
    return parm1 + parm2

lbv = client.load_balanced_view()
async_results = []
for parm1 in [0,1,2]:
    for parm2 in [0,1,2]:
        ar = lbv.apply_async(someFunc, parm1, parm2)
        async_results.append(ar)

从我提交的客户端,我可以根据它们在 async_results 数组中的顺序找出哪个结果与哪个函数调用一起使用。

我想知道的是,如果我使用 queue_status 或 history 命令从不同的客户端检索结果以获取 msg_id 并使用 client.get_result 命令检索结果,我如何找出与 msg_id 关联的函数和参数。

4

1 回答 1

1

这些东西被腌制,并存储在集线器数据库的“缓冲区”中。如果您想查看它们,您必须从数据库中获取这些缓冲区,然后解压缩它们。

假设您有一个 msg_ids 列表,您可以通过以下方式为所有这些请求重建 f、args 和 kwargs:

# msg_ids is a list of msg_id, however you decide to get that
from IPython.zmq.serialize import unpack_apply_message

# load the buffers from the hub's database:
query = rc.db_query({'msg_id' : {'$in' : msg_ids } }, keys=['msg_id', 'buffers'])
# query is now a list of dicts with two keys - msg_id and buffers

# now we can generate a dict by msg_id of the original function, args, and kwargs:
requests = {}
for q in query:
    msg_id = 
    f, args, kwargs = unpack_apply_message(q['buffers'])
    requests[q['msg_id']] = (f, args, kwargs)

由此,您应该能够根据任务的功能和参数来关联任务。

一个警告:由于 f 已经通过酸洗,通常比较f is original_f会是 False,所以你必须做更松散的比较,例如f.__module__ + f.__name__或类似。

更详细一点,这里有一个示例,它生成一些请求,然后根据函数和参数对它们进行重构和关联,这些函数和参数对原始请求的外观有一些先验知识。

于 2012-12-16T22:50:20.037 回答