我正在通过多个通道处理数据,每个通道都输入下一个通道(管道处理)。我最终得到一个spawn
看起来像这样的顶部:
let future = async move {
while let Ok(msg) = r.recv().await {
forwarder.receive(msg).await;
}
};
executor_pool::ExecutorPool::spawn(future).detach();
Forwarder
看起来像这样:
Forwarder {
pub fn validate_sequence(&mut self, msg: TestMessage) -> Result<TestMessage, TestMessage>
pub async fn handle_action(&mut self, cmd: TestMessage);
pub async fn handle_notification(&mut self);
pub async fn receive(&mut self, cmd: TestMessage) {
match self.handle_config(cmd) {
Ok(_) => (),
Err(msg) => match self.validate_sequence(msg) {
Ok(msg) => {
self.handle_action(msg).await;
self.handle_notification().await;
},
Err(msg) => panic!("{} sequence error: expecting {} not cmd {:#?}", self.header(), self.next_seq, msg),
},
}
}
}
两者都handle_action
调用handle_notification
一个发件人,这是另一个async fn
。我的担忧有两方面。到 send (或任何其他)的整个路径async fn
似乎需要一个 async/await 包装器。就我而言,我在发送时深 3 级。这似乎有点难看,特别是如果我必须进行任何重构。其次,每个级别的 async/await 是否存在运行时成本,或者编译器是否足够聪明以崩溃这些?如果它有助于使这更具体,可以将其视为音频处理,其中第一阶段解码,下一个阶段进行调平,下一个阶段进行混合,然后最后阶段进行编码。
为了扩展重构关注点,让我们看一下重构 for 循环。
pub async fn handle_action(&mut self, cmd: FwdMessage) {
match cmd {
FwdMessage::TestData(_) => {
for sender in &self.senders {
for _ in 0 .. self.forwarding_multiplier {
sender.send(FwdMessage::TestData(self.send_count)).await.ok();
self.send_count += 1;
}
}
},
_ => panic!("unhandled action"),
}
}
for
我们不想使用循环,而是使用iterator
. 然而,这将需要一个异步闭包——这是我什至不知道如何表达的东西。