13

我正在尝试使用 Rayon's 优化我的功能par_iter()

单线程版本类似于:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.iter().map(|tx| {

         tx.verify_and_store(store)

    }).collect();

    ...
}

每个Store实例只能由一个线程使用,但Store可以同时使用多个实例,所以我可以通过clone-ing使这个多线程store

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.par_iter().map(|tx| {

         let mut local_store = store.clone();

         tx.verify_and_store(&mut local_store)

    }).collect();

    ...
}

store但是,这会在每次迭代时克隆,这太慢了。我想每个线程使用一个商店实例。

人造丝可以吗?还是我应该求助于手动线程和工作队列?

4

2 回答 2

8

老问题,但我觉得答案需要重新审视。一般来说,有两种方法:

使用map_with. 每次线程从另一个线程窃取工作项时,这将克隆。这可能会克隆比线程更多的商店,但它应该相当低。如果克隆太昂贵,您可以增加 rayon 的大小,将使用with_min_len.

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    let result = txs.iter().map_with(|| store.clone(), |store, tx| {
         tx.verify_and_store(store)
    }).collect();
    ...
}

或者使用ThreadLocalthread_local crate中的作用域。这将确保您只使用与线程一样多的对象,并且一旦ThreadLocal对象超出范围,它们就会被销毁。

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    let tl = ThreadLocal::new();
    let result = txs.iter().map(|tx| {
         let store = tl.get_or(|| Box::new(RefCell::new(store.clone)));
         tx.verify_and_store(store.get_mut());
    }).collect();
    ...
}
于 2019-08-11T18:46:54.413 回答
8

可以使用线程局部变量来确保local_store在给定线程中不会多次创建它。

例如,这会编译(完整源代码):

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::cell::RefCell;
    thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));

    let mut result = Vec::new();

    txs.par_iter().map(|tx| {
        STORE.with(|cell| {
            let mut local_store = cell.borrow_mut();
            if local_store.is_none() {
                *local_store = Some(store.clone());
            }
            tx.verify_and_store(local_store.as_mut().unwrap())
        })
    }).collect_into(&mut result);
}

然而,这段代码有两个问题。第一,如果克隆store需要在完成时做某事par_iter(),例如刷新缓冲区,它根本不会发生——它们Drop只会在 Rayon 的工作线程退出时被调用,即使这样也不能保证

第二个也是更严重的问题是store每个工作线程只创建一次的克隆。如果 Rayon 缓存了它的线程池(我相信它确实如此),这意味着以后不相关的调用verify_and_store将继续使用 的最后一个已知克隆store,这可能与当前存储无关。

这可以通过使代码稍微复杂化来纠正:

  • 将克隆的变量存储在 aMutex<Option<...>>而不是 中Option,以便调用的线程可以访问它们par_iter()。这将在每次访问时产生一个互斥锁,但该锁将是无争议的,因此很便宜。

  • 在互斥体周围使用 anArc以收集对向量中创建的商店克隆的引用。None该向量用于通过在迭代完成后将它们重置来清理存储。

  • 将整个调用包装在一个不相关的互斥体中,以便两个并行调用verify_and_store最终不会看到彼此的存储克隆。(如果在迭代之前创建并安装了一个新的线程池,这可能是可以避免的。)希望这种序列化不会影响 的性能verify_and_store,因为每个调用都会使用整个线程池。

结果并不漂亮,但它可以编译,只使用安全代码,并且似乎可以工作:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::sync::{Arc, Mutex};
    type SharedStore = Arc<Mutex<Option<Store>>>;

    lazy_static! {
        static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
        static ref NO_REENTRY: Mutex<()> = Mutex::new(());
    }
    thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));

    let mut result = Vec::new();
    let _no_reentry = NO_REENTRY.lock();

    txs.par_iter().map({
        |tx| {
            STORE.with(|arc_mtx| {
                let mut local_store = arc_mtx.lock().unwrap();
                if local_store.is_none() {
                    *local_store = Some(store.clone());
                    STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
                }
                tx.verify_and_store(local_store.as_mut().unwrap())
            })
        }
    }).collect_into(&mut result);

    let mut store_clones = STORE_CLONES.lock().unwrap();
    for store in store_clones.drain(..) {
        store.lock().unwrap().take();
    }
}
于 2017-03-07T19:13:11.227 回答