1

我正试图围绕 Rust 中的未来进行思考,但我对这段代码感到困惑,该代码应该将消息发送rxsink

extern crate futures;
extern crate tokio_core;
extern crate websocket;

use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;

use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{thread, time};
use futures::sync::mpsc::Receiver;

fn main() {
    let mut core = Core::new().unwrap();
    let (mut tx, rx) = mpsc::channel(5);
    thread::spawn(|| worker(rx));
    let mut i = 0;
    loop {
        let res = tx.clone().send(OwnedMessage::Text(format!("Test {}", i)));
        core.run(res);
        i += 1;
        let period = time::Duration::from_millis(200);
        thread::sleep(period);
    }
}

fn worker(rx: Receiver<OwnedMessage>) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    // bind to the server
    let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
    let f = server.incoming()
            // we don't wanna save the stream if it drops
            .map_err(|InvalidConnection { error, .. }| error)
            .for_each(|(upgrade, addr)| {
                // accept the request to be a ws connection if it does
                let f = upgrade
                    .use_protocol("rust-websocket")
                    .accept()
                    .and_then(|(s, _)| {
                        let (sink, stream) = s.split();
                        rx // using stream (echoing back) works
                            .forward(sink)
                            .map_err(|error| {
                                error
                            })
                            .and_then(|(a, sink)| {
                                sink.send(OwnedMessage::Close(None))
                            })
                    });

                handle.spawn(f.map_err(move |e| println!("Err"))
                    .map(move |_| println!("Done")));
                Ok(())
            });
    core.run(f).expect("somerror");
}

如评论中所述,stream用作输入工作正常。使用rx时,编译器会抱怨关于错误类型的类型不匹配(我相信):

error[E0271]: type mismatch resolving `<futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>> as futures::Sink>::SinkError == ()`
  --> src/main.rs:47:26
   |
47 |                         .forward(sink)
   |                          ^^^^^^^ expected enum `websocket::WebSocketError`, found ()
   |
   = note: expected type `websocket::WebSocketError`
              found type `()`

error[E0599]: no method named `map_err` found for type `futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>>` in the current scope
  --> src/main.rs:48:26
   |
48 |                         .map_err(|error| {
   |                          ^^^^^^^
   |
   = note: the method `map_err` exists but the following trait bounds were not satisfied:
           `futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>> : futures::Future`

这些是我的依赖项:

[dependencies]
websocket = "0.20.0"
futures = "0.1"
tokio-core = "0.1"

我在这里想念什么?

4

1 回答 1

10
error[E0271]: type mismatch resolving
  `<futures::stream::SplitSink<
        websocket::client::async::Framed<
            tokio_core::net::TcpStream,
            websocket::async::MessageCodec<websocket::OwnedMessage>>>
   as futures::Sink>::SinkError == ()`

我们这里有两种类型:<futures::stream::SplitSink<...> as futures::Sink>::SinkError()。这两种类型从何而来?另外,第一个是未解析的关联类型;也许我们可以解决它以获得更多的洞察力?让我们一步一步地追踪它。

首先,我们需要弄清楚为什么编译器首先要尝试匹配这两种类型。如果我们查看 的签名forward,我们将看到约束Self::Error: From<S::SinkError>Self是我们正在调用的流的类型forward,而S是作为参数传递给forward.

我们正在调用forwardrx其类型为futures::sync::mpsc::Receiver。在 的文档页面上Receiver,我们可以看到以下内容:

impl<T> Stream for Receiver<T>
  type Item = T
  type Error = ()

这向我们展示了它的()来源。现在让我们看一下sink论证。

的类型sinkfutures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>(我们从错误消息中知道这一点;RLS也确认了这一点)。在 的文档页面上SplitSink,我们有:

impl<S: Sink> Sink for SplitSink<S>
  type SinkItem = S::SinkItem
  type SinkError = S::SinkError

So SplitSink'sSinkError与它的内部水槽's 相同SinkError。内水槽的类型是websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>文档说明了Framed什么?

impl<T, U> Sink for Framed<T, U>
where
    T: AsyncWrite,
    U: Encoder,
    <U as Encoder>::Error: From<Error>,
  type SinkItem = <U as Encoder>::Item
  type SinkError = <U as Encoder>::Error

Framed有两个类型参数,但是我们只需要看第二个,也就是websocket::async::MessageCodec<websocket::OwnedMessage>这里,来判断SinkError类型。现在让我们来看看MessageCodec。(注:websocket::codec::ws::MessageCodec转出口websocket::async::MessageCodec

impl<M> Decoder for MessageCodec<M>
where
    M: MessageTrait,
  type Item = OwnedMessage
  type Error = WebSocketError

啊哈!接收器产生类型错误WebSocketError


现在我们已经弄清楚了类型,让我们回到为什么我们首先关心类型。我们试图了解为什么Self::Error: From<S::SinkError>在调用forward. 我们现在知道编译器正在尝试解析(): From<WebSocketError>. 好像impl From<WebSocketError> for ()没有 让我们验证一下:

extern crate websocket;

fn main() {
    let a = websocket::result::WebSocketError::NoDataAvailable;
    let () = From::from(a);
}

实际上,这无法编译:

error[E0277]: the trait bound `(): std::convert::From<websocket::WebSocketError>` is not satisfied
 --> src/main.rs:5:14
  |
5 |     let () = From::from(a);
  |              ^^^^^^^^^^ the trait `std::convert::From<websocket::WebSocketError>` is not implemented for `()`
  |
  = note: required by `std::convert::From::from`

我们可以通过使用sink_map_errto changesink的错误类型来解决缺少的实现。

let (sink, stream) = s.split();
let sink = sink.sink_map_err(|_| ()); // <<<<<
rx
    .forward(sink)
    .and_then(|(a, sink)| {
        sink.send(OwnedMessage::Close(None))
    })

这解决了对 的调用forward,但是现在这个闭包的结果不与 组合upgrade.use_protocol("rust-websocket").accept(),它仍然具有WebSocketError它的错误类型。rx改为更改的错误类型更有意义。但是我们如何WebSocketError从 a构造 a (),它不携带任何信息?

您可能想知道,为什么Receiver使用()它的错误类型?如果我们查看源代码,我们可以看到,实际上poll 永远不会返回错误。我认为如果错误类型是!(never 类型)或其他一些 void 类型会更合适,以清楚地表明错误是不可能的;期货存在一个问题,要求对期货 0.2 进行此更改。

由于错误是不可能的,我们不需要构造一个WebSocketError; 我们可以改为分歧,例如通过恐慌。

fn worker(rx: Receiver<OwnedMessage>) {
    let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    // bind to the server
    let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
    let f = server.incoming()
            // we don't wanna save the stream if it drops
            .map_err(|InvalidConnection { error, .. }| error)
            .for_each(|(upgrade, addr)| {
                // accept the request to be a ws connection if it does
                let f = upgrade
                    .use_protocol("rust-websocket")
                    .accept()
                    .and_then(|(s, _)| {
                        let (sink, stream) = s.split();
                        rx
                            .forward(sink)
                            .and_then(|(a, sink)| {
                                sink.send(OwnedMessage::Close(None))
                            })
                    });

                handle.spawn(f.map_err(move |e| println!("Err"))
                    .map(move |_| println!("Done")));
                Ok(())
            });
    core.run(f).expect("somerror");
}

现在,仍然有一个错误:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/main.rs:43:31
   |
30 |     let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
   |         -- captured outer variable
...
43 |                     .and_then(|(s, _)| {
   |                               ^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure

为什么关闭试图移动rx?因为按价值forward取值self。为什么闭包是一个FnMut?注意,Future::and_then需要一个FnOnce(将值从捕获的变量移动到FnOnce闭包中是有效的),但Stream::for_each需要一个FnMut. 这是有道理的:for_each将为每个传入连接调用一次闭包!

您正在使用的通道是多生产者、单消费者(因此命名为mpsc),但您试图在这里拥有多个消费者(每个连接都试图从接收器读取)。我会留给你在你的程序中解决这个设计问题。请记住,可以有多个并发客户端连接!

于 2017-12-01T01:44:26.563 回答