1

此代码运行完成并打印出my_data如果我取消注释函数中的sleep行的do_work值。如果我把它注释掉,我的可执行文件每次都会挂起。

为什么 Condvar 不唤醒最后一个线程?提到收集句柄并等待它们加入主线程,但这应该由人造丝作用域处理,对吗?

sleep如果没有 in 中的语句,我如何才能完成此代码do_work()

use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, Barrier, Condvar, Mutex,
    },
    thread,
    time::Duration,
};

fn do_work(
    mtx: Arc<Mutex<bool>>,
    cond_var: Arc<Condvar>,
    barrier: Arc<Barrier>,
    quitting: &AtomicBool,
    range: &mut [f32],
) {
    while !quitting.load(Ordering::SeqCst) {
        barrier.wait();
        //thread::sleep(Duration::from_micros(1));
        let mut started = mtx.lock().unwrap();
        while !*started && !quitting.load(Ordering::SeqCst) {
            started = cond_var.wait(started).unwrap();
        }
        if quitting.load(Ordering::SeqCst) {
            break;
        } else {
            range.iter_mut().for_each(|i| *i += 1.0);
        }
    }
    println!("{:?} Joining", thread::current().id());
}

fn start_work(mtx: Arc<Mutex<bool>>, cond_var: Arc<Condvar>) {
    let mut started = mtx.lock().unwrap();
    *started = true;
    cond_var.notify_all();
}

fn reset_work(mtx: Arc<Mutex<bool>>) {
    let mut started = mtx.lock().unwrap();
    *started = false;
}
fn main() {
    let num_threads = 4;
    let test_barrier = Arc::new(Barrier::new(num_threads + 1));

    let test_mutex = Arc::new(Mutex::new(false));
    let test_cond_var = Arc::new(Condvar::new());

    let mut my_data = vec![0.0; 1024];
    my_data
        .iter_mut()
        .enumerate()
        .for_each(|(i, iter)| *iter = i as f32);
    let chunk_size = my_data.len() / num_threads;
    let quitting = AtomicBool::new(false);
    rayon::scope(|s| {
        for chunk in my_data.chunks_mut(chunk_size) {
            let thread_mtx = test_mutex.clone();
            let thread_cond_var = test_cond_var.clone();
            let thread_barrier = Arc::clone(&test_barrier);
            let temp = &quitting;
            s.spawn(move |_| do_work(thread_mtx, thread_cond_var, thread_barrier, &temp, chunk));
        }
        test_barrier.wait();
        let _upper_bound = 1024 / num_threads;
        for _i in 0..10 {
            start_work(test_mutex.clone(), test_cond_var.clone());
            test_barrier.wait();
            reset_work(test_mutex.clone());
        }
        quitting.store(true, Ordering::SeqCst);
    });
    println!("my_data is: {:?}", my_data);
}

Cargo.toml 依赖项:

rayon = "*"

这是对do_work稍后将进行的更复杂数学的测试,但我试图获得一系列成功修改较大的Vec.

4

1 回答 1

0

我让预期的行为按预期工作。这似乎特别令人费解,并且应该有更好的方法。我很乐意接受一个比我所拥有的答案更复杂的答案,但它至少会产生所需的行为,一个带有预旋转线程的线程池,只要它从主线程接收到正确的信号并且可以确定性地关闭,就会产生突发工作方法。额外的开始和结束线程提供了一个握手机制,以确保没有竞争条件出现在障碍上。

use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Barrier,
    },
    thread,
};

fn do_work(
    start_barrier: &Barrier,
    finish_barrier: &Barrier,
    quitting: &AtomicBool,
    starting: &AtomicBool,
    finishing: &AtomicBool,
    range: &mut [f32],
) {
    while !quitting.load(Ordering::SeqCst) {
        start_barrier.wait();
        while !starting.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
        {
            // let mut started = mtx.lock().unwrap();
            // while !*started && !quitting.load(Ordering::SeqCst) {
            //     started = cond_var.wait(started).unwrap();
            // }
        }
        if quitting.load(Ordering::SeqCst) {
            break;
        } else {
            range.iter_mut().for_each(|i| *i += 1.0);
        }
        finish_barrier.wait();
        while !finishing.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
    }
    println!("{:?} Joining", thread::current().id());
}

fn main() {
    let num_threads = 4;
    let start_barrier = Barrier::new(num_threads + 1);
    let finish_barrier = Barrier::new(num_threads + 1);
    let mut my_data = vec![0.0; 1024];
    my_data
        .iter_mut()
        .enumerate()
        .for_each(|(i, iter)| *iter = i as f32);
    let chunk_size = my_data.len() / num_threads;
    let quitting = AtomicBool::new(false);
    let starting = AtomicBool::new(false);
    let finishing = AtomicBool::new(false);


    rayon::scope(|s| {
        for chunk in my_data.chunks_mut(chunk_size) {
            let thread_start_barrier = &start_barrier;
            let thread_finish_barrier = &finish_barrier;

            let thread_quitting = &quitting;
            let thread_starting = &starting;
            let thread_finishing = &finishing;

            s.spawn(move |_| do_work(   thread_start_barrier,
                                        thread_finish_barrier,
                                        thread_quitting, 
                                        thread_starting,
                                        thread_finishing,
                                        chunk));
        }
        let num_rounds = 10;
        for i in 0..num_rounds {
            let start = std::time::Instant::now();
            start_barrier.wait();
            finishing.store(false, Ordering::SeqCst);
            starting.store(true, Ordering::SeqCst);
            finish_barrier.wait();
            if i == num_rounds-1 {
                quitting.store(true, Ordering::SeqCst);
            }
            finishing.store(true, Ordering::SeqCst);
            starting.store(false, Ordering::SeqCst);
            println!("Round {} took: {:?}", i, std::time::Instant::now() - start);
        }
    });
    println!("my_data is: {:?}", my_data);
}
于 2022-02-16T21:05:03.273 回答