4

我一直在研究一个函数,它将使用 Rust 和线程将一堆文件从源复制到目标。我在让线程共享迭代器时遇到了一些麻烦。我还不习惯借阅系统:

extern crate libc;
extern crate num_cpus;

use libc::{c_char, size_t};
use std::thread;
use std::fs::copy;

fn python_str_array_2_str_vec<T, U, V>(_: T, _: U) -> V {
    unimplemented!()
}

#[no_mangle]
pub extern "C" fn copyFiles(
    sources: *const *const c_char,
    destinies: *const *const c_char,
    array_len: size_t,
) {
    let src: Vec<&str> = python_str_array_2_str_vec(sources, array_len);
    let dst: Vec<&str> = python_str_array_2_str_vec(destinies, array_len);
    let mut iter = src.iter().zip(dst);
    let num_threads = num_cpus::get();
    let threads = (0..num_threads).map(|_| {
        thread::spawn(|| while let Some((s, d)) = iter.next() {
            copy(s, d);
        })
    });
    for t in threads {
        t.join();
    }
}

fn main() {}

我收到了我无法解决的编译错误:

error[E0597]: `src` does not live long enough
  --> src/main.rs:20:20
   |
20 |     let mut iter = src.iter().zip(dst);
   |                    ^^^ does not live long enough
...
30 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

error[E0373]: closure may outlive the current function, but it borrows `**iter`, which is owned by the current function
  --> src/main.rs:23:23
   |
23 |         thread::spawn(|| while let Some((s, d)) = iter.next() {
   |                       ^^                          ---- `**iter` is borrowed here
   |                       |
   |                       may outlive borrowed value `**iter`
   |
help: to force the closure to take ownership of `**iter` (and any other referenced variables), use the `move` keyword, as shown:
   |         thread::spawn(move || while let Some((s, d)) = iter.next() {

我已经看到了以下问题:

当使用多个我不使用的线程时,值的寿命不够长chunks,我想尝试通过线程共享一个迭代器,尽管创建块以将它们传递给线程将是经典的解决方案。

无法在线程之间发送 &str 因为它的寿命不够长 我已经看到了一些使用通道与线程通信的答案,但我不太确定使用它们。应该有一种更简单的方法来通过线程共享一个对象。

为什么局部变量对于 thread::scoped 的寿命不够长 这引起了我的注意,scoped应该可以解决我的错误,但是由于它位于不稳定的通道中,我想看看是否有另一种方法可以使用spawn.

有人可以解释我应该如何修复生命周期以便可以从线程访问迭代器吗?

4

1 回答 1

10

这是您的问题的一个最小的、可重现的示例

use std::thread;

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];
    let mut iter = src.iter().zip(dst);
    thread::spawn(|| {
        while let Some((s, d)) = iter.next() {
            println!("{} -> {}", s, d);
        }
    });
}

有多个相关问题:

  1. 迭代器位于堆栈上,线程的闭包引用它。
  2. 闭包接受对迭代器的可变引用。
  3. Vec迭代器本身具有对位于堆栈上的a 的引用。
  4. Vec本身有对可能存在于堆栈上的字符串切片的引用,但不能保证任何一种方式都比线程存在更长的时间。

换句话说,Rust 编译器阻止了你执行四个独立的内存不安全部分。

要认识到的一个主要问题是,您生成的任何线程都可能比您生成它的地方寿命更长。即使您立即调用join,编译器也无法静态验证是否会发生,因此它必须采取保守路径。这就是作用域线程的意义——它们保证线程在它们开始的堆栈帧之前退出。

此外,您正尝试在多个并发线程中使用可变引用。保证可以安全地并行调用迭代器(或构建它的任何迭代器)。两个线程完全有可能同时next调用。这两段代码并行运行并写入相同的内存地址。一个线程写入一半数据,另一个线程写入另一半,现在您的程序在未来的某个任意时间点崩溃。

使用crossbeam 之类的工具,您的代码将类似于:

use crossbeam; // 0.7.3

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];

    let mut iter = src.iter().zip(dst);
    while let Some((s, d)) = iter.next() {
        crossbeam::scope(|scope| {
            scope.spawn(|_| {
                println!("{} -> {}", s, d);
            });
        })
        .unwrap();
    }
}

如前所述,这一次只会产生一个线程,等待它完成。获得更多并行性的另一种方法(本练习的通常要点)是交换对next和的调用spawn。这需要通过关键字将所有权转移sd线程:move

use crossbeam; // 0.7.3

fn main() {
    let src = vec!["one", "alpha"];
    let dst = vec!["two", "beta"];

    let mut iter = src.iter().zip(dst);
    crossbeam::scope(|scope| {
        while let Some((s, d)) = iter.next() {
            scope.spawn(move |_| {
                println!("{} -> {}", s, d);
            });
        }
    })
    .unwrap();
}

如果在 中添加 sleep 调用spawn,您可以看到线程并行运行。

但是,我会使用for循环编写它:

let iter = src.iter().zip(dst);
crossbeam::scope(|scope| {
    for (s, d) in iter {
        scope.spawn(move |_| {
            println!("{} -> {}", s, d);
        });
    }
}).unwrap();

最后,迭代器在当前线程上运行,然后从迭代器返回的每个值都被移交给一个新线程。保证新线程在捕获的引用之前退出。

您可能对Rayon感兴趣,这是一个允许某些类型的迭代器轻松并行化的板条箱。

也可以看看:

于 2017-07-26T13:05:39.357 回答