1

我正在构建一个使用 celery 执行异步任务的进度条应用程序。该任务以 GraphQL 突变开始,并每秒更新自己的状态:

@celery.task(bind=True, name='mock_analyzing')
def mock_analyzing(self, up_to=20):
    for i in range(up_to):
        self.update_state(
            state='PROGRESS',
            meta={
                'done': i,
                'total': up_to
            }
        )
        time.sleep(1)
    return 'SUCCESS'

当我想通过 GraphQL 订阅发布该状态时,属性设置不正确。我究竟做错了什么?

class Subscription(graphene.ObjectType):
    file_status = graphene.String(file_id=graphene.ID())

    def resolve_file_status(root, info, file_id):
        task = mock_analyzing.AsyncResult(file_id)
        return Observable.interval(1000)\
                         .map(lambda done = task.info.get('done'), total = task.info.get('total'): {id: file_id, 'done': done, 'total': total})\
                         .take_while(lambda i: not task.ready())

我想我不明白 Observable 的权利。有人能帮我吗?提前致谢

4

0 回答 0