可以使用线程局部变量来确保local_store
在给定线程中不会多次创建它。
例如,这会编译(完整源代码):
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::cell::RefCell;
thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));
let mut result = Vec::new();
txs.par_iter().map(|tx| {
STORE.with(|cell| {
let mut local_store = cell.borrow_mut();
if local_store.is_none() {
*local_store = Some(store.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}).collect_into(&mut result);
}
然而,这段代码有两个问题。第一,如果克隆store
需要在完成时做某事par_iter()
,例如刷新缓冲区,它根本不会发生——它们Drop
只会在 Rayon 的工作线程退出时被调用,即使这样也不能保证。
第二个也是更严重的问题是store
每个工作线程只创建一次的克隆。如果 Rayon 缓存了它的线程池(我相信它确实如此),这意味着以后不相关的调用verify_and_store
将继续使用 的最后一个已知克隆store
,这可能与当前存储无关。
这可以通过使代码稍微复杂化来纠正:
将克隆的变量存储在 aMutex<Option<...>>
而不是 中Option
,以便调用的线程可以访问它们par_iter()
。这将在每次访问时产生一个互斥锁,但该锁将是无争议的,因此很便宜。
在互斥体周围使用 anArc
以收集对向量中创建的商店克隆的引用。None
该向量用于通过在迭代完成后将它们重置来清理存储。
将整个调用包装在一个不相关的互斥体中,以便两个并行调用verify_and_store
最终不会看到彼此的存储克隆。(如果在迭代之前创建并安装了一个新的线程池,这可能是可以避免的。)希望这种序列化不会影响 的性能verify_and_store
,因为每个调用都会使用整个线程池。
结果并不漂亮,但它可以编译,只使用安全代码,并且似乎可以工作:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::sync::{Arc, Mutex};
type SharedStore = Arc<Mutex<Option<Store>>>;
lazy_static! {
static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
static ref NO_REENTRY: Mutex<()> = Mutex::new(());
}
thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));
let mut result = Vec::new();
let _no_reentry = NO_REENTRY.lock();
txs.par_iter().map({
|tx| {
STORE.with(|arc_mtx| {
let mut local_store = arc_mtx.lock().unwrap();
if local_store.is_none() {
*local_store = Some(store.clone());
STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}
}).collect_into(&mut result);
let mut store_clones = STORE_CLONES.lock().unwrap();
for store in store_clones.drain(..) {
store.lock().unwrap().take();
}
}