18

语境

我有一个情况,多个线程必须更新存储在共享向量中的对象。但是向量很大,要更新的元素数量比较少。

问题

在一个最小的示例中,要更新的元素集可以由包含要更新的元素的索引的(散列)集来标识。因此,代码可能如下所示:

let mut big_vector_of_elements = generate_data_vector();

while has_things_to_do() {
    let indices_to_update = compute_indices();
    indices_to_update.par_iter() // Rayon parallel iteration
       .map(|index| big_vector_of_elements[index].mutate())
       .collect()?;
}

这在 Rust 中显然是不允许的:big_vector_of_elements不能同时在多个线程中可变地借用。但是,将每个元素包装在例如Mutex锁中似乎是不必要的:这种特定情况在没有显式同步的情况下是安全的。由于索引来自一组,因此它们保证是不同的。没有两次迭代par_iter接触向量的同一个元素。

重申我的问题

编写一个并行改变向量中元素的程序的最佳方法是什么,其中同步已经通过选择索引来处理,但是编译器不理解后者?

一个接近最佳的解决方案是将所有元素包装big_vector_of_elements在某个假设的UncontendedMutex锁中,这将是一种变体,Mutex在无竞争的情况下速度快得离谱,并且在发生争用(甚至恐慌)时可能需要任意长的时间。理想情况下,对于 any , anUncontendedMutex<T>也应该与 , 具有相同的大小和对齐方式。TT

相关但不同的问题:

多个问题可以用“使用 Rayon 的并行迭代器”、“使用chunks_mut”或“使用split_at_mut”来回答:

这些答案在这里似乎无关紧要,因为这些解决方案意味着迭代整个big_vector_of_elements,然后为每个元素确定是否需要更改任何内容。从本质上讲,这意味着这样的解决方案如下所示:

let mut big_vector_of_elements = generate_data_vector();

while has_things_to_do() {
    let indices_to_update = compute_indices();
    for (index, mut element) in big_vector_of_elements.par_iter().enumerate() {
        if indices_to_update.contains(index) {
            element.mutate()?;
        }
    }
}

该解决方案所花费的时间与 的大小成正比big_vector_of_elements,而第一个解决方案仅在与 的大小成比例的多个元素上循环indices_to_update

4

5 回答 5

7

您可以通过调用来排序indices_to_update和提取可变引用split_*_mut

let len = big_vector_of_elements.len();

while has_things_to_do() {
    let mut tail = big_vector_of_elements.as_mut_slice();

    let mut indices_to_update = compute_indices();
    // I assumed compute_indices() returns unsorted vector
    // to highlight the importance of sorted order
    indices_to_update.sort();

    let mut elems = Vec::new();

    for idx in indices_to_update {
        // cut prefix, so big_vector[idx] will be tail[0]
        tail = tail.split_at_mut(idx - (len - tail.len())).1;

        // extract tail[0]
        let (elem, new_tail) = tail.split_first_mut().unwrap();
        elems.push(elem);

        tail = new_tail;
    }
}

仔细检查此代码中的所有内容;我没有测试它。然后你可以打电话elems.par_iter(...)或其他什么。

于 2019-05-02T08:40:29.983 回答
6

当编译器不能强制对切片元素的可变引用不是排他的时,Cell这非常好。

您可以将 a&mut [T]转换为&Cell<[T]>using Cell::from_mut,然后将 a&Cell<[T]>转换为&[Cell<T>]using Cell::as_slice_of_cells。所有这些都是零成本的:它只是用来指导类型系统。

A&[Cell<T>]就像 a &[mut T],如果可以这样写的话:对可变元素切片的共享引用。您可以使用Cells 执行的操作仅限于读取或替换 - 您无法获得对包装元素本身的引用,无论是否可变。Rust 也知道这Cell不是线程安全的(它没有实现Sync)。这保证了一切都是安全的,没有动态成本。

fn main() {
    use std::cell::Cell;

    let slice: &mut [i32] = &mut [1, 2, 3];
    let cell_slice: &Cell<[i32]> = Cell::from_mut(slice);
    let slice_cell: &[Cell<i32>] = cell_slice.as_slice_of_cells();
    
    let two = &slice_cell[1];
    let another_two = &slice_cell[1];

    println!("This is 2: {:?}", two);
    println!("This is also 2: {:?}", another_two);
    
    two.set(42);
    println!("This is now 42!: {:?}", another_two);
}
于 2020-06-24T22:16:27.900 回答
3

我认为这是使用unsafe代码的合理场所。逻辑本身是安全的,但不能被编译器检查,因为它依赖于类型系统之外的知识(的契约BTreeSet,它本身依赖于的实现Ord和朋友usize)。

在此示例中,我们通过 抢先检查所有索引range,因此每次调用都add可以安全使用。由于我们采用了一个集合,我们知道所有的索引都是不相交的,所以我们没有引入可变别名。从切片中获取原始指针很重要,以避免切片本身和返回值之间出现混叠。

use std::collections::BTreeSet;

fn uniq_refs<'i, 'd: 'i, T>(
    data: &'d mut [T],
    indices: &'i BTreeSet<usize>,
) -> impl Iterator<Item = &'d mut T> + 'i {
    let start = data.as_mut_ptr();
    let in_bounds_indices = indices.range(0..data.len());

    // I copied this from a Stack Overflow answer
    // without reading the text that explains why this is safe
    in_bounds_indices.map(move |&i| unsafe { &mut *start.add(i) })
}

use std::iter::FromIterator;

fn main() {
    let mut scores = vec![1, 2, 3];

    let selected_scores: Vec<_> = {
        // The set can go out of scope after we have used it.
        let idx = BTreeSet::from_iter(vec![0, 2]);
        uniq_refs(&mut scores, &idx).collect()
    };

    for score in selected_scores {
        *score += 1;
    }

    println!("{:?}", scores);
}

使用此函数查找所有单独的可变引用后,您可以使用 Rayon 并行修改它们:

use rayon::prelude::*; // 1.0.3

fn example(scores: &mut [i32], indices: &BTreeSet<usize>) {
    let selected_scores: Vec<_> = uniq_refs(scores, indices).collect();
    selected_scores.into_par_iter().for_each(|s| *s *= 2);

    // Or

    uniq_refs(scores, indices).par_bridge().for_each(|s| *s *= 2);
}

您可能希望考虑使用 bitset 而不是 aBTreeMap来提高效率,但此答案仅使用标准库。

也可以看看:

于 2019-05-06T16:39:44.003 回答
2

由于我一直在处理类似的问题,所以这是我的解决方案,除非绝对必要,否则我不建议使用:

struct EvilPtr<T> {
    ptr: *mut T,
}
impl<T> EvilPtr<T> {
    fn new(inp: &mut T) -> Self {
        EvilPtr { ptr: inp as *mut T }
    }
    unsafe fn deref(&self) -> *mut T {
        return self.ptr;
    }
}

unsafe impl<T> Sync for EvilPtr<T> {}
unsafe impl<T> Send for EvilPtr<T> {}

现在你可以这样做:

let indices: [usize; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let mut arr: [i32; 10] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let e = EvilPtr::new(&mut arr[0]);
unsafe {
    indices.par_iter().for_each(|x: &usize| {
        *e.deref().add(*x) += *x as i32;
    });
}
println!("{:?}", arr);

如果您绝对需要这样做,我建议您将其隐藏在一些用户友好的界面下,您可以确保不会出现错误。

于 2020-10-23T15:19:52.250 回答
1

我有一个相关的问题。我需要并行分配给二维数组的任意列。我使用了 ndarray myarray。axis_chunks_iter_mut(nd::Axis(1), 1)遍历每一列。

于 2021-01-14T22:07:09.283 回答