6

如何设置阻塞函数以在执行程序中运行,结果无关紧要,因此主线程不应该等待或被它拖慢。

老实说,我不确定这是否是正确的解决方案,我想要的只是将某种类型的处理队列与主进程分开,这样它就不会阻止服务器应用程序返回请求,因为这样Web 服务器类型为许多请求运行一个工作人员。

最好我想远离像 Celery 这样的解决方案,但如果这是最优化的,我愿意学习它。

这里的上下文是一个异步 Web 服务器,它生成带有大图像的 pdf 文件。

app = Sanic()
#App "global" worker
executor = ProcessPoolExecutor(max_workers=5)

app.route('/')
async def getPdf(request):
  asyncio.create_task(renderPdfsInExecutor(request.json))
  #This should be returned "instantly" regardless of pdf generation time
  return response.text('Pdf being generated, it will be sent to your email when done')

async def renderPdfsInExecutor(json):
  asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)

def syncRenderPdfs(json)
  #Some PDF Library that downloads images synchronously
  pdfs = somePdfLibrary.generatePdfsFromJson(json)
  sendToDefaultMail(pdfs)

上面的代码给出了错误(是的,它以管理员身份运行):

PermissionError [WinError 5] Access denied
Future exception was never retrieved

奖励问题:通过在执行程序中运行 asyncio 循环,我能获得什么吗?因此,如果它一次处理多个 PDF 请求,它将在它们之间分配处理。如果是,我该怎么做?

4

2 回答 2

7

好的,所以首先有一个误解。这个

async def getPdf(request):
    asyncio.create_task(renderPdfsInExecutor(request.json))
    ...

async def renderPdfsInExecutor(json):
    asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)

是多余的。做就够了

async def getPdf(request):
    asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, request.json)
    ...

或者(因为你不想等待)更好

async def getPdf(request):
    executor.submit(syncRenderPdfs, request.json)
    ...

现在你得到的问题是因为syncRenderPdfsthrows PermissionError。它没有被处理,所以 Python 会警告你“嘿,一些后台代码抛出了一个错误。但是代码不属于任何人,所以这到底是怎么回事?”。这就是为什么你得到Future exception was never retrieved. 您对 pdf 库本身有问题,而不是 asyncio。一旦你解决了这个内部问题,确保安全也是一个好主意:

def syncRenderPdfs(json)
    try:
        #Some PDF Library that downloads images synchronously
        pdfs = somePdfLibrary.generatePdfsFromJson(json)
        sendToDefaultMail(pdfs)
    except Exception:
        logger.exception('Something went wrong')  # or whatever

您的“权限被拒绝”问题是完全不同的事情,您应该调试它和/或为此发布一个单独的问题。

至于最后一个问题:是的,executor 会在 worker 之间排队并平均分配任务。

编辑:正如我们在评论中所说,实际问题可能出在您使用的 Windows 环境中。或者更准确地说,使用 ProcessPoolExecutor,即生成进程可能会更改权限。我建议使用 ThreadPoolExecutor,假设它在平台上运行良好。

于 2019-02-18T14:37:24.507 回答
0

您可以查看 asyncio.gather(*tasks) 以并行运行多个。

请记住,并行任务只有在它们是 io 绑定而不是阻塞的情况下才能很好地工作。

来自 python 文档的示例(https://docs.python.org/3/library/asyncio-task.html):

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())
于 2019-02-21T14:21:35.247 回答