1

我设计了一个简单的fifo,它存储a DequePolicy,如果元素太多,它可以从后面删除元素:

use super::deque_policy::DequePolicy;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Condvar;

pub struct BoundedFifo<T> {
    pub deque: VecDeque<T>,
    pub policy: Box<dyn DequePolicy<T>>,
    //pub condvar: Condvar
}

impl<T> BoundedFifo<T> {
    pub fn new(policy: Box<dyn DequePolicy<T>>) -> BoundedFifo<T> {
        BoundedFifo{
            deque: VecDeque::<T>::new(),
            policy: policy,
            //condvar: Condvar::new()
        }
    }
}

impl<T> BoundedFifo<T> {
    pub fn pop_front(&mut self) -> Option<T> {
        self.policy.before_pop_front(&self.deque);
        //self.condvar.notify_all();
        self.deque.pop_front()
    }
    pub fn push_back(&mut self, t: T) {
        self.deque.push_back(t);
        //self.condvar.notify_all();
        self.policy.after_push_back(&mut self.deque);
    }
}

我想在 2 个线程之间共享这个 fifo,但是有一个问题。我希望一个线程能够等待元素出现在 fifo 上。这就是为什么我添加了condvar现在评论的内容。

我评论它是因为我很快意识到要fifo在线程之间共享,我必须把它放在一个Arc<Mutex<>>. 因此,例如,为了等待condvar它在内部fifo: Arc<Mutex<BoundedFifo<u8>>>,我必须将它锁定在一个线程上:fifo.lock().unwrap().condvar.wait_timeout(...). 这基本上可以防止其他线程写入,因此没有必要等待。

然后我就有了这样做的想法:

type Fifo<T> = Arc<(Mutex<BoundedFifo<T>>, Condvar)>;

现在我可以等待一个 condvar,但我还必须记住每当我调用fifo: Fifo<T>'spush_back和时通知这个 condvar pop_front。例如:

let pair = ...
let fifo = pair.0;
let condvar = pair.1;
fifo.push_back(0);
condvar.notify_all();

这很容易出错,因为我很容易忘记调用它。在 C++ 上,它会让我等待共享事物中的某些内容(即使这是不安全的)。

我考虑过创建宏push_back!pop_front!这将push_backpop_front调用 一起调用notify_allcondvar但我觉得这不是很优雅。

我对 Rust 有点陌生,所以我正在为此寻求更好的解决方案。例如,是否有可能实现push_backpop_frontfor Fifo<T>?这样我就避免了宏:slight_smile:。这是一个想法。

4

2 回答 2

2

让你的线程绕过一个Arc<BoundedFifo<T>>. 它将使用内部可变性,并在其面向公众的方法中自行完成所有互斥锁和条件操作,这些方法只需要&self. 您将需要的所有ConditionMutexpolicy方法只需要对它们的不可变引用。

并且不要制作 struct fields pub

我不明白你的policy对象在做什么。

pub struct BoundedFifo<T> {
    deque: Mutex<VecDeque<T>>,
    policy: Box<dyn DequePolicy<T>>,
    condition: Condition,
}

impl<T> BoundedFifo<T> {
    /// pops a value off the front, if one is available
    /// waits only long enough to get the lock
    pub fn pop_front_nonwaiting(&self) -> Option<T> {
        let lock = self.deque.lock().unwrap();
        (*lock).pop_front()
    }

    /// waits until an element is available, then pops it.
    pub fn pop_front_waiting(&self) -> T {

        let lock = self.deque.lock().unwrap();
        let lock = self.condition.wait_while(lock,
                                             |dq| dq.len() == 0));
        self.condition.notify_all();
        // this will be safe because we know len() != 0
        (*lock).pop_front().unwrap()
    }
    pub fn push_back(&self, t: T) {
        let lock = self.deque.lock().unwrap();
        (*lock).push_back(t);
        // overflow/discarding logic goes here
        self.condition.notify_all();
    }
}
于 2020-09-28T06:11:53.587 回答
1

您可以将几乎任何类型包装在结构中,并且新结构将与它包装的类型共享相同的属性(至少在派生适当的特征之后)。

然后,您可以从底层类型公开您需要的方法。

在这种情况下type Fifo<T> = Arc<(Mutex<BoundedFifo<T>>, Condvar)>;,我们可以创建一个struct Fifo<T>(Arc<(Mutex<BoundedFifo<T>>, Condvar)>);. 然后在结构上实现你想要的方法。

需要注意的一件事是需要更改Box<dyn DequePolicy<T>>Box<dyn DequePolicy<T> + Send>允许线程之间的共享。请参阅这个问题:在 Rust 中的线程之间发送特征对象以获取更多信息。

#[derive(Clone)]
struct FiFo<T>(Arc<(Mutex<BoundedFifo<T>>, Condvar)>);

impl<T> FiFo<T> {
    fn push_back(&self, item: T) {
        (self.0).0.lock().unwrap().push_back(item);
        (self.0).1.notify_all();
    }
}

fn main() {
    let mut bounded_fifo = BoundedFifo::new(Box::new(DequePolicyThing));

    let pair = FiFo(Arc::new((Mutex::new(bounded_fifo), Condvar::new())));

    let pair2 = pair.clone();

    std::thread::spawn(move || {
        pair2.push_back("abc".to_owned());
    });

    // The first .0 is to access the Arc inside FiFo, the second is for the Mutex.
    let mut bounded = (pair.0).0.lock().unwrap(); 

    bounded = (pair.0).1.wait(bounded).unwrap();
    println!("Notified {:?}", bounded.pop_front());
}

锈游乐场

于 2020-09-28T04:50:55.577 回答