我很难弄清楚如何让从 tokio 消耗的消息mpsc::channel
同时运行。我目前的方法是调用into_stream()
调用recv()
,然后调用for_each_concurrent
这个包装的对象。据我了解,这应该在消息进入时同时执行代码,而不是按顺序执行(如您使用时recv()
)。
下面的示例省略了不相关的细节,但总体思路应该很容易看到。
let upload_id = s3_response.upload_id.unwrap();
let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
let txb1 = txb.clone();
match part {
Some((path, part_number)) => {
let opts = options();
// ...
let _ = fs::remove_file(path);
let _ = txb1.send((completed_part, upload_id.clone())).await;
log::info!("Finished uploading part: {}", part_number);
}
None => {
log::info!("Finished uploading part files");
}
}
});
这会导致编译器错误,提示无法将事物 (upload_id
和) 移入闭包中。我已经尝试了针对这些问题提出的各种更普遍的方法,但似乎没有一种方法适用于这种特定情况。我在这里错过了什么重要的东西吗?示例错误如下。txb
FnMut
let upload_id = s3_response.upload_id.unwrap();
| --------- captured outer variable
...
131 | let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
| ___________________________________________________________________-_________________^
| | __________________________________________________________________|
| ||
132 | || let uid = upload_id.clone();
| || ---------
| || |
| || move occurs because `upload_id` has type `String`, which does not implement the `Copy` trait
| || move occurs due to use in generator
133 | || let txb1 = txb.clone();
134 | || match part {
... ||
161 | || }
162 | || });
| || ^
| ||_________|
| |__________captured by this `FnMut` closure
| move out of `upload_id` occurs here
error[E0507]: cannot move out of `txb`, a captured variable in an `FnMut` closure
--> src/lib.rs:131:84
|
45 | let (txb, mut rxb) = mpsc::channel(1); // builder channel
| --- captured outer variable
...
131 | let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
| ___________________________________________________________________-_________________^
| | __________________________________________________________________|
| ||
132 | || let uid = upload_id.clone();
133 | || let txb1 = txb.clone();
| || ---
| || |
| || move occurs because `txb` has type `tokio::sync::mpsc::Sender<(CompletedPart, String)>`, which does not implement the `Copy` trait
| || move occurs due to use in generator
134 | || match part {
... ||
161 | || }
162 | || });
| || ^
| ||_________|
| |__________captured by this `FnMut` closure
| move out of `txb` occurs here
任何帮助深表感谢。谢谢!