4

需要注册全局热键。例如,f4f8。使用键盘库,当第一个回调没有返回时,下一个不会调用。

换句话说,像这样的日志

pressed f4
end for f4
pressed f8
end for f8

但我想喜欢这个

pressed f4
pressed f8
end for f4
end for f8

演示代码

# pip install keyboard
from keyboard import add_hotkey, wait
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: on_callback("f4"))
add_hotkey("f8", lambda: on_callback("f8"))

wait('esc')

我尝试使用asyncio,但没有任何改变

pressed f4
end for f4
pressed f8
end for f8
from keyboard import add_hotkey, wait
import asyncio

async def on_callback(key):
    print('pressed', key)
    await asyncio.sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: asyncio.run(on_callback("f4")))
add_hotkey("f8", lambda: asyncio.run(on_callback("f8")))

wait('esc')

更新 1

键盘库的开发人员建议使用call_later为每个函数创建新线程的函数,callback它的工作原理和我想要的一样。

但是有没有办法在同一个线程中完成这样的任务(使用asyncio)?我没有成功。

# example with 'call_later' function
from keyboard import add_hotkey, wait, call_later
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: call_later(on_callback, args=("f4",)))
add_hotkey("f8", lambda: call_later(on_callback, args=("f8",)))

wait('esc')

更新 2

现在它看起来像下面(github上的完整代码)。我似乎为了等待http请求而创建新线程是太繁重的操作。因此我想在当前线程中使用 asyncio 并同时继续处理其他热键。

from googleapiclient.discovery import build
from os import getenv
from settings import get_settings
from loguru import logger
import keyboard

class ScriptService():

    def __init__(self):
        # ...
        self._script = AppsScript(id)
        self._hotkeys = values["hotkeys"]

    def _register_hotkeys(self):
        self._add_hotkey(self._hotkeys["reload"], self._on_reload)
        for item in self._hotkeys["goofy"]:
            k, f = item["keys"], item["function"]
            self._add_hotkey(k, self._on_callback, args=(f, k))

    def _add_hotkey(self, keys, callback, args=()):
        # lambda bug: https://github.com/boppreh/keyboard/issues/493
        keyboard.add_hotkey(keys, lambda: keyboard.call_later(callback, args))

    def _on_callback(self, function, keys):
        response = self._script.run(function)

class AppsScript():

    def __init__(self, id: str):
        self._name = getenv("API_SERVICE_NAME")
        self._version = getenv("API_VERSION")
        self._id = id

    def run(self, function: str):
        body = {"function": function}
        with build(self._name, self._version, credentials=get_credentials()) as service:
            # http request
            return service.scripts().run(scriptId=self._id, body=body).execute()

4

1 回答 1

1

不幸的是,您使用的库实际上都不是可等待的,因此将它们与 asyncio 一起使用将是一个挑战。您可以从 google 库中提取实际的 http 调用,然后稍后使用与 asyncio 兼容的库来实现您自己的客户端,但为了避免启动新线程的费用,这需要做很多工作。

幸运的是,已经有一种方法可以避免启动线程的代价:使用工作线程池。在这种方法中,我们不是为每个回调立即启动一个新线程,而是将任务添加由我们提前启动的线程池服务的任务队列中。这样,我们只需支付启动线程的费用,然后我们只需支付将请求序列化到线程的费用——这不是什么都没有,但它比启动线程要少。

虽然可以让 asyncio 管理线程池,但在这种情况下,它根本没有任何优势,因为您的代码中没有其他内容是可等待的。(如果您确实想这样做,您将使用loop.run_in_exeuctor(),注意不要按照此问题中的说明重新创建池。)

这是一些虚拟代码,需要适应您的类:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint


def process(task):
    print(task["name"])
    sleep(3 + randint(0, 100) / 100)
    print(f"Task {task['name']} done")


class WorkerThread(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
        print("Started worker thread")
        self._open = True

    def run(self):
        while self._open:
            task = self.queue.get()
            process(task)
            self.queue.task_done()

    def close(self):
        print("Closing", self)
        self._open = False


task_queue = Queue()
THREADS = 6
worker_threads = [WorkerThread(task_queue) for _ in range(THREADS)]
for worker in worker_threads:
    worker.setDaemon(True)
    worker.start()


print("Sending one task")
task_queue.put({"name": "Task 1"})
sleep(1)

print("Sending a bunch of tasks")
for i in range(1, 15):
    task_queue.put({"name": f"Task {i}"})
print("Sleeping for a bit")
sleep(2)

print("Shutting down")

# wrap this in your exit code
task_queue.join()  # wait for everything to be done
for worker in worker_threads:
    worker.close()

还有其他方法,但我认为在这里明确写出来更清楚。请注意,我假设您的代码不受 cpu 限制,因此使用线程而不是进程是有意义的。

顺便说一句,这看起来很像. 之类的最小实现celery,这可能对您的需求有点过分,但我们可能会感兴趣。

顺便说一句,我不懂俄语,但这看起来是一个有趣的项目。

于 2021-10-04T12:43:26.650 回答