0

在尝试编写一些并行代码时,我不断遇到如下错误。我找到了一些解决方案,但它们都涉及我不想做的锁定。有什么办法可以解决这个问题吗?

pub trait PermBrute {
    fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
        let mut vec : Vec<(f64, Vec<usize>)> = Vec::new();
        let results = Arc::new(Mutex::new(vec));
        let mut threads = vec![];

        for i in 0..*CPUS {
            threads.push( thread::spawn({
                let clone = Arc::clone(&results);
                let text = ciphertext.clone();
                move || {
                    // some code here
                    let hold = self.decrypt( )
                    // some more code here

                    let mut v = clone.lock().unwrap();
                    v.push(best_key);
                }
            }));
        }

        for t in threads {
            t.join().unwrap();
        }

        let lock = Arc::try_unwrap(results).expect("Lock still has multiple owners");
        let mut hold = lock.into_inner().expect("Mutex cannot be locked");

        // do some stuff with hold and return out

        return out;
    }

    fn decrypt( &self, ciphertext : &String, key : &Vec<usize>) -> String;
}
error[E0277]: `Self` cannot be shared between threads safely
   --> src/ciphers/cipher.rs:131:27
    |
108 |     fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
    |                                                                            - help: consider further restricting `Self`: `where Self: std::marker::Sync`
...
131 |             threads.push( thread::spawn({
    |                           ^^^^^^^^^^^^^ `Self` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `Self`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&Self`
    = note: required because it appears within the type `[closure@src/ciphers/cipher.rs:140:17: 164:18 text:std::string::String, start_len:usize, end_len:usize, count:usize, start_points_clone:std::vec::Vec<usize>, local_self:&Self, end:usize, clone:std::sync::Arc<std::sync::Mutex<std::vec::Vec<(f64, std::vec::Vec<usize>)>>>]`
4

2 回答 2

3

使用rayon crate,这可以使用并行迭代器技术来完成。

pub trait PermBrute {
    fn quadgram(&self, max_len: usize, ciphertext: &String) -> Vec<usize> {
        let mut vec: Vec<(f64, Vec<usize>)> = Vec::new();
        let mut threads = vec![];
        
        let best_keys: Vec<_> = (0..*CPUS)
            .into_par_iter()
            .map(|i| {
                // some code here
                // you can access `ciphertext` here directly without copying
                todo!();
                // some more code here
                
                best_key
            })
            .collect();

        // do some stuff with best_keys and return out

        return out;
    }
    fn decrypt(&self, ciphertext: &String, key: &Vec<usize>) -> String;
}
于 2020-09-28T09:52:39.207 回答
1

我花了一些时间来修改您的代码,以便我可以在 rust 操场上对其进行测试。这是修改后的源代码:

use std::sync::{Arc, Mutex};
use std::thread;

pub trait PermBrute {
    fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
        let mut vec : Vec<(f64, Vec<usize>)> = Vec::new();
        let results = Arc::new(Mutex::new(vec));
        let mut threads = vec![];

        for i in 0..10 {
            threads.push( thread::spawn({
                let clone = Arc::clone(&results);
                let text = ciphertext.clone();
                move || {
                    // some code here
                    let hold = self.decrypt( &String::new(), &vec![] );
                    // some more code here

                    let mut v = clone.lock().unwrap();
                    // v.push(best_key);
                }
            }));
        }

        for t in threads {
            t.join().unwrap();
        }

        let lock = Arc::try_unwrap(results).expect("Lock still has multiple owners");
        let mut hold = lock.into_inner().expect("Mutex cannot be locked");

        // do some stuff with hold and return out

        // return out;
        unimplemented!()
    }

    fn decrypt(&self, ciphertext: &String, key: &Vec<usize>) -> String;
}

首先,您可以Self通过以下方式进行限制:

pub trait PermBrute: Sync {}

然后,rustc开始关心生命周期:

(错误太长,然后我正在使用playground

这篇文章应该回答你的问题。总之,threads都是后台衍生出来的,而且rustc还是很傻,不理你join的s。有类似Arc<Self>或的解决方法AtomicPtr<Self>


更新

让我们从一个最小的例子开始:

use std::thread;

fn try_to_spawn() {
    let x: String = "5".to_string();
    let j = thread::spawn(|| {
        println!("{}", x.len());
    });
    j.join().unwrap();
}

在这里,rustc说:

error[E0373]: closure may outlive the current function, but it borrows `x`, which is owned by the current function
 --> src/lib.rs:5:27
  |
5 |     let j = thread::spawn(|| {
  |                           ^^ may outlive borrowed value `x`
6 |         println!("{}", x.len());
  |                        - `x` is borrowed here
  |

help: to force the closure to take ownership of `x` (and any other referenced variables), use the `move` keyword
  |
5 |     let j = thread::spawn(move || {
  |                           ^^^^

这里rustc抱怨所借的寿命xrustc认为:由于产生了一个线程并将在后台运行,因此它可以在函数try_to_spawn退出之前或之后终止,因此在执行x时可能会悬空x.len()

但是很明显,我们join在函数结束时编辑了线程,并且我们x的寿命肯定足够长(当然,'static从人类的角度来看,寿命不是必需的)。但是,rustc它仍然太愚蠢,无法理解人类,而且它对我们的一些事情一无所知join!。

可以x关闭,而不是借用它。但是,在以后的时间里将无法使用x。要以“安全”的方式解决问题,您可以使用Arc<String>

use std::thread;
use std::sync::Arc;

fn try_to_spawn() {
    let x: Arc<String> = Arc::new("5".to_string());
    let x_clone: Arc<String> = x.clone();
    let j = thread::spawn(move || {
        println!("{}", x_clone.len());
    });
    j.join().unwrap();
    println!("{}", x.len());
}

Arc有开销。一个人可能想要使用指针*const String或者*mut String为了避免生命周期检查——但是原始指针不是Send/Sync并且不能转移到 a thread。要通过线程之间的指针共享资源,您必须使用AtomicPtrhere is an discussion about making raw pointers to Send + Sync)。


那么回到这个问题,你的self(类型&Self)呢?当然,也是一个参考!而且rustc也无法弄清楚它的“真实寿命”:

use std::thread;
use std::sync::Arc;

struct S;

impl S {
    fn try_to_spawn(&self) {
        let j = thread::spawn(|| {
            self.do_something();
        });
        j.join().unwrap();
    }
    
    fn do_something(&self) {}
}

产生错误信息:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/lib.rs:8:31
   |
8  |           let j = thread::spawn(|| {
   |  _______________________________^
9  | |             self.do_something();
10 | |         });
   | |_________^
   |

这个看起来不像之前的生命周期错误,但更类似于您的代码中发生的错误。要再次解决此问题,您可以使用Arc<Self>

fn try_to_spawn(self: Arc<Self>) {
    let j = thread::spawn(move || {
        self.do_something();
    });
    j.join().unwrap();
}

或使用AtomicPtr<Self>

use std::thread;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::Relaxed;

struct S;

impl S {
    fn try_to_spawn(&self) {
        let self_ptr: AtomicPtr<Self> 
            = AtomicPtr::new(self as *const Self as *mut Self);
        let j = thread::spawn(move || {
            unsafe {
                self_ptr.load(Relaxed) // *mut Self
                    .as_ref()          // Option<&Self>
                    .unwrap()          // &Self
                    .do_something();
            }
        });
        j.join().unwrap();
    }
    
    fn do_something(&self) {}
}

这有效但丑陋。我还建议使用 cratesrayon来执行并行计算。但是,对于您想要thread手动创建 s 的情况,我仍然希望这个答案是有帮助的。

于 2020-09-28T10:45:04.017 回答