1

我正在阅读futures-preview0.3 的来源,以了解如何正确地“通知任何人”。在mpsc::channel(有界)中,多个发件人可能会等待收据(在缓冲区已满的情况下)。

查看 and 的实现next_messageunpark_one接收者似乎只通知一个发送者每一张收据。

我怀疑这是否适用于select!,因为select!可能会导致错误通知。但是,我无法提出问题案例。

这是我试图混淆的尝试mpsc

[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"

和这个:

#![feature(async_await, await_macro, futures_api, pin)]

use std::collections::HashSet;

use futures::prelude::*;

use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;

async fn main2() {
    let channel_len = 1;
    let num_false_wait = 1000;
    let num_expected_messages = 100;

    let (mut send, mut recv) = channel(channel_len);
    // One extra capacity per sender. Fill the extras.
    await!(send.send(-2)).unwrap();

    // Fill buffers
    for _ in 0..channel_len {
        await!(send.send(-1)).unwrap();
    }

    // False waits. Should resolve and produce false waiters.
    for _ in 0..num_false_wait {
        await!(false_wait(&send));
    }

    // True messages.
    {
        let mut send = send.clone();
        await!(send.send(-2)).unwrap();
        tokio::spawn(async move {
            for i in 0..num_expected_messages {
                await!(send.send(i)).unwrap();
            }
            Ok(())
        }.boxed().compat());
    }

    // Drain receiver until all true messages are received.
    let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
    while !expects.is_empty() {
        let i = await!(recv.next()).unwrap();
        expects.remove(&i);
        eprintln!("Received: {}", i);
    }
}

// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
    let (wait_send, wait_recv) = oneshot::channel();
    let mut send = send.clone();
    await!(send.send(-2)).unwrap();
    tokio::spawn(async move {
        let mut sending = send.send(-3);
        let mut fallback = future::ready(());
        select! {
            sending => {
                sending.unwrap();
            },
            fallback => {
                eprintln!("future::ready is selected");
            },
        };
        wait_send.send(()).unwrap();
        Ok(())
    }.boxed().compat());
    await!(wait_recv).unwrap();
}

fn main() {
    tokio::run(async {
        await!(main2());
        Ok(())
    }.boxed().compat());
}

我希望这会发生:

  1. 缓冲区由 填充-1。因此,后来的发件人被阻止。
  2. 有“真服务员”和“假服务员”之分。假服务员已经退出,因为另一只手select! 立刻完成。
  3. 在对 的每次调用中await!(recv.next()),最多通知一个等待的发件人。如果通知了虚假的服务员,即使缓冲区有空房间,也没有人可以推送到缓冲区。
  4. 如果所有元素都在没有真正通知的情况下耗尽,整个系统就会卡住。

尽管我的期望,main2异步功能成功完成。为什么?

4

2 回答 2

2

Further investigation on the futures source code solved my problem. At last, I cannot confuse the mpsc in this way.

The point is that, the size of mpsc is flexible and can grow more than initially specified. This behavior is mentioned in the docs:

The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.

Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.

Problem with fixed buffer

Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:

  • When the queue is empty, receivers block.
  • When the queue is full (that is, the size is hitting the bound), senders block.

In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).

In multithread programming, this is accomplished by primitives like notify_one. However, in futures, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select! or Deadline) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).

mpsc is flexible

As pointed out above, the buffer size for futures::channel::mpsc::channel isn't strict. The spec is summarized as:

  • When message_queue.len() == 0, receivers block.
  • When message_queue.len() >= buffer, senders may block.
  • When message_queue.len() >= buffer + num_senders, senders block.

Here, num_senders is basically the number of clones of Sender, but more than that in some cases. More precisely, num_senders is the number of SenderTasks.

So, how do we avoid resource sharing? We have additional states for that:

  • Each sender (an instance of SenderTask) has is_parked boolean state.
  • The channel has another queue called parked_queue, a queue of Arc references to SenderTask.

The channel maintains the following invariants:

  • message_queue.len() <= buffer + num_parked_senders. Note that we don't know the value of num_parked_senders.
  • parked_queue.len() == min(0, message_queue.len() - buffer)
  • Each parked sender has at least one message in parked_queue.

This is accomplished by the following algorithm:

  • For receiving,
    • it pops off a SenderTask from parked_queue and, if the sender is parked, unpark it.
  • For sending,
    • It always waits for is_parked to be false. If message_queue.len() < buffer, as parked_queue.len() == 0, all senders are unparked. Therefore we can guarantee progress in this case.
    • If is_parked is false, push the message to the queue anyway.
    • After that, if message_queue.len() <= buffer, it needs to do nothing further.
    • if message_queue.len() > buffer, the sender is made unparked and pushed to parked_queue.

You can easily check the invariant is maintained in the algorithm above.

Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked state. Even if the sending task is dropped before completion, it just remains in parked_queue for a while and doesn't block anything. How clever it is!

于 2018-11-17T14:55:44.350 回答
1

我怀疑这适用于 select!,因为 select! 可能导致虚假通知。

不,您不能mpsc使用以下命令“混淆”频道select!

select!不会触发任何与 mspc 相关的通知,它只会返回首先完成的未来。

当消息队列已满时,它 await!(recv.next())会通知一个生产者进入有界通道的插槽现在可用。

换句话说:没有true waitersfalse waiters:当通道消息队列已满时,生产者阻塞并等待接收方消费入队的消息。

于 2018-11-12T09:13:20.347 回答