0

我很难弄清楚如何让从 tokio 消耗的消息mpsc::channel同时运行。我目前的方法是调用into_stream()调用recv(),然后调用for_each_concurrent这个包装的对象。据我了解,这应该在消息进入时同时执行代码,而不是按顺序执行(如您使用时recv())。

下面的示例省略了不相关的细节,但总体思路应该很容易看到。

let upload_id = s3_response.upload_id.unwrap();
let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
    let txb1 = txb.clone();
    match part {
        Some((path, part_number)) => {
            let opts = options();
                         //  ...
                    let _ = fs::remove_file(path);
                    let _ = txb1.send((completed_part, upload_id.clone())).await;
                    log::info!("Finished uploading part: {}", part_number);
                }
                None => {
                    log::info!("Finished uploading part files");
                }
            }
        });

这会导致编译器错误,提示无法将事物 (upload_id和) 移入闭包中。我已经尝试了针对这些问题提出的各种更普遍的方法,但似乎没有一种方法适用于这种特定情况。我在这里错过了什么重要的东西吗?示例错误如下。txbFnMut

          let upload_id = s3_response.upload_id.unwrap();
    |                --------- captured outer variable
...
131 |            let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
    |  ___________________________________________________________________-_________________^
    | | __________________________________________________________________|
    | ||
132 | ||             let uid = upload_id.clone();
    | ||                       ---------
    | ||                       |
    | ||                       move occurs because `upload_id` has type `String`, which does not implement the `Copy` trait
    | ||                       move occurs due to use in generator
133 | ||             let txb1 = txb.clone();
134 | ||             match part {
...   ||
161 | ||             }
162 | ||         });
    | ||         ^
    | ||_________|
    | |__________captured by this `FnMut` closure
    |            move out of `upload_id` occurs here

error[E0507]: cannot move out of `txb`, a captured variable in an `FnMut` closure
   --> src/lib.rs:131:84
    |
45  |        let (txb, mut rxb) = mpsc::channel(1); // builder channel
    |             --- captured outer variable
...
131 |            let _ = rxu.recv().into_stream().for_each_concurrent(10, |part| async move {
    |  ___________________________________________________________________-_________________^
    | | __________________________________________________________________|
    | ||
132 | ||             let uid = upload_id.clone();
133 | ||             let txb1 = txb.clone();
    | ||                        ---
    | ||                        |
    | ||                        move occurs because `txb` has type `tokio::sync::mpsc::Sender<(CompletedPart, String)>`, which does not implement the `Copy` trait
    | ||                        move occurs due to use in generator
134 | ||             match part {
...   ||
161 | ||             }
162 | ||         });
    | ||         ^
    | ||_________|
    | |__________captured by this `FnMut` closure
    |            move out of `txb` occurs here

任何帮助深表感谢。谢谢!

4

0 回答 0