3

我正在将 Tokio 用于一些异步 Rust 代码,并且遇到了问题。我有一些需要访问连接池的任务,并且连接池的性质意味着一次只能运行一个固定数量(NUMCPUS) - 所有其他请求都会阻塞,直到有空闲连接。

目前,我只是在使用 task::spawn_blocking,这很有效。但是,这样做的缺点是,一旦有 512 个请求在连接池上阻塞,Tokio 的整个阻塞池就会耗尽,所有阻塞任务都只是排队。这可以防止代码中其他地方不依赖连接池的任何 spawn_blocking 调用也运行。

有没有办法告诉 Tokio 保持一组特定的阻塞任务分开,一次只产生 N 个,同时仍然允许不相关的阻塞任务运行而不排队?

spawn_blocking 文档建议将 Rayon 用于 CPU 密集型任务,但是 a) 不清楚如何将 Rayon 与 Tokio 集成 b) 我的任务无论如何都不是 CPU 密集型的。

4

1 回答 1

1

您可以使用Semaphore: 使用并发允许的任务数对其进行初始化,并让每个任务在处理之前获取信号量并在完成后释放它。类似(未经测试):

use tokio::sync::Semaphore;

struct Pool {
    sem: Semaphore,
}

impl Pool {
    fn new (size: usize) -> Self {
        Pool { sem: Semaphore::new (size), }
    }

    async fn spawn<T> (&self, f: T) -> T::Output
    where
        T: Future + Send + 'static,
        T::Output: Send + 'static,
    {
        let handle = self.sem.acquire().await;
        f.await
    }
}
于 2021-10-13T06:56:39.963 回答