0

我有一些涉及使用嵌套循环的多线程代码,其中内部循环并行运行。每个线程之间的“共享”是一个Sender将返回结果的。值得注意的是,因此Sender克隆Send它并使用人造丝发送它应该没有问题for_each_with()。但是,编译此代码:

use std::sync::mpsc::channel;
use rayon::prelude::*;

fn main(){
    let (sender, receiver) = channel();

    (0..5).for_each(|i|{
        (0..5).into_par_iter().for_each_with(&sender, |sender, j|{
            sender.send(i + j).unwrap();   
        });
    });
}

给我:

8 |         (0..5).into_par_iter().for_each_with(&sender, |sender, j|{
  |                                              ^^^^^^^ `Sender<_>` cannot be shared between threads safely
  |
  = help: the trait `Sync` is not implemented for `Sender<_>`
  = note: required because of the requirements on the impl of `Send` for `&Sender<_>`

游乐场

现在我在想这可能是因为我试图克隆一个引用,但是如果我将实际移动senderfor_each_with()(ie for_each_with(sender, ...)) 中,它将被外部循环的第一次迭代消耗:

error[E0507]: cannot move out of `sender`, a captured variable in an `FnMut` closure

游乐场)。

如何以满足 Rust 编译器的方式实现此模式?

4

2 回答 2

1

AFAIK,人造丝使用线程池,这意味着人造丝需要执行该项目,Send因为它会将其发送到线程。人造丝不会为每个项目克隆,而是为每个线程克隆:

fn for_each_with<OP, T>(self, init: T, op: OP) where
    OP: Fn(&mut T, Self::Item) + Sync + Send,
    T: Send + Clone,

我们可以看到OPtake a &mut Tnot a T。这意味着for_each_with()克隆用于每个线程数而不是生成的项目数。

参考需要实现Sync来实现SendSender被定义为不实现Sync。反正我不知道这个选择的细节,这意味着&Sender不能在线程之间共享。我认为没有解决方案可以消除这种约束。

但是,如果您愿意,可以使用实现的横梁通道Sync

use crossbeam_channel::unbounded; // 0.5.1
use rayon::prelude::*; // 1.5.1

fn main() {
    let (sender, _receiver) = unbounded();

    for i in 0..5 {
        (0..5).into_par_iter().for_each_with(&sender, |sender, j| {
            sender.send(i + j).unwrap();
        });
    }
}

会编译得很好。好处是横梁通道声称更快。也就是说,克隆对于 std 来说是完全可以的Sender

use rayon::prelude::*;
use std::sync::mpsc::channel;

fn main() {
    let (sender, _receiver) = channel();

    for i in 0..5 {
        let sender = sender.clone();
        (0..5).into_par_iter().for_each_with(sender, |sender, j| {
            sender.send(i + j).unwrap();
        });
    }
}

那确实会克隆O(n)更多时间,但从Senderstd 意味着要克隆很多。(实际上它可能只是添加一个克隆,因为您选择创建一个嵌套循环,代码可能不会克隆最后一个,而只是给它最后一个线程,所以您只需克隆一个太多,我们可以测试

无论如何,您所有的问题都来自一种奇怪的情况,应该将迭代变平,例如:

use rayon::prelude::*;
use std::sync::mpsc::channel;

fn main() {
    let (sender, _recever) = channel();

    (0..5)
        .into_par_iter()
        .flat_map_iter(|i| (0..5).map(move |j| (i, j)))
        .for_each_with(sender, |sender, (i, j)| {
            sender.send(i + j).unwrap();
        });
}

您也可以考虑不使用Sender人造丝可能意味着使用,collect()而不是发送物品,您可以在最后收集它们。

于 2021-08-29T06:01:51.220 回答
-1

Sender被克隆,它的字面意思是在文档中。每个子线程都必须有自己的发送者。

于 2021-08-28T15:30:30.610 回答