28

我想使用异步调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这似乎对我不起作用。

这是代码:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

我希望上面的代码只允许阻塞函数输出:

blocking 0/5
blocking 1/5

然后查看非阻塞函数的输出。但是相反,即使在我取消之后,阻塞的未来仍在继续。

可能吗?还有其他方法吗?

谢谢

编辑:更多关于使用 asyncio 运行阻塞和非阻塞代码的讨论:How to interface blocking and non-blocking code with asyncio

4

2 回答 2

28

在这种情况下,一旦它实际开始运行,就无法取消Future它,因为您依赖于 的行为concurrent.futures.Future,并且它的文档说明了以下内容

cancel()

尝试取消通话。如果调用当前正在执行并且无法取消,则该方法将返回False,否则该调用将被取消并且该方法将返回True

因此,取消成功的唯一时间是任务仍在Executor. 现在,您实际上是在使用 aasyncio.Future包裹 a concurrent.futures.Future,实际上,如果您在调用 之后尝试返回 by ,即使底层任务实际上已经在运行,asyncio.Future返回的 byloop.run_in_executor()也会引发 a 。但是,它实际上不会取消.CancellationErroryield fromcancel()Executor

如果您需要实际取消任务,则需要使用更常规的方法来中断线程中运行的任务。你如何做到这一点的细节取决于用例。对于您在示例中提供的用例,您可以使用threading.Event

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
于 2014-10-16T22:30:33.587 回答
1

由于线程共享同一进程的内存地址空间,因此没有安全的方法来终止正在运行的线程。这就是为什么大多数编程语言不允许杀死正在运行的线程的原因(围绕这个限制有很多丑陋的黑客攻击)。

Java很难学会它。

一个解决方案包括在单独的进程而不是线程中运行您的函数并优雅地终止它。

Pebble库提供了一个类似于支持取消concurrent.futures运行的接口。Futures

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
于 2017-08-08T10:16:36.967 回答