我认为您正在寻找结构化并发,特别是全局隐式范围。可悲的是,没有人能找到一个好的解决方案,所以暂时放弃了这方面的工作。
同时,这是一种可能的解决方法。调用spawn_keep_alive
而不是tokio::spawn
任何应该使程序保持活动状态的任务:
操场
use parking_lot::Mutex;
use std::{
future::Future,
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use tokio::{sync::oneshot, task::JoinHandle};
static KEEPALIVE_COUNT: AtomicU32 = AtomicU32::new(0);
static KEEPALIVE_SENDER: Mutex<Option<oneshot::Sender<()>>> = parking_lot::const_mutex(None);
pub fn spawn_keep_alive<T>(task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
KEEPALIVE_COUNT.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async {
let result = task.await;
if KEEPALIVE_COUNT.fetch_sub(1, Ordering::Relaxed) == 1 {
let sender = KEEPALIVE_SENDER.try_lock().unwrap().take().unwrap();
sender.send(()).unwrap();
}
result
})
}
async fn do_update() {
let mut stream = tokio::time::interval(Duration::from_millis(100));
stream.tick().await;
for _ in 0..10 {
println!("Foo");
stream.tick().await;
}
spawn_keep_alive(async {
tokio::time::sleep(Duration::from_millis(1000)).await;
println!("I'm aliveeee!");
});
spawn_keep_alive(async {
tokio::time::sleep(Duration::from_millis(2000)).await;
println!("Don't forget about me!");
});
}
#[tokio::main]
async fn main() {
let (send, recv) = oneshot::channel();
*KEEPALIVE_SENDER.try_lock().unwrap() = Some(send);
spawn_keep_alive(do_update());
// Wait for all keep-alive tasks to finish
recv.await.unwrap();
}
(KEEPALIVE_SENDER
可能效率更高,但只使用了两次)。