我正试图围绕 Rust 中的未来进行思考,但我对这段代码感到困惑,该代码应该将消息发送rx
到sink
:
extern crate futures;
extern crate tokio_core;
extern crate websocket;
use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;
use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{thread, time};
use futures::sync::mpsc::Receiver;
fn main() {
let mut core = Core::new().unwrap();
let (mut tx, rx) = mpsc::channel(5);
thread::spawn(|| worker(rx));
let mut i = 0;
loop {
let res = tx.clone().send(OwnedMessage::Text(format!("Test {}", i)));
core.run(res);
i += 1;
let period = time::Duration::from_millis(200);
thread::sleep(period);
}
}
fn worker(rx: Receiver<OwnedMessage>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
// bind to the server
let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
let f = server.incoming()
// we don't wanna save the stream if it drops
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|(upgrade, addr)| {
// accept the request to be a ws connection if it does
let f = upgrade
.use_protocol("rust-websocket")
.accept()
.and_then(|(s, _)| {
let (sink, stream) = s.split();
rx // using stream (echoing back) works
.forward(sink)
.map_err(|error| {
error
})
.and_then(|(a, sink)| {
sink.send(OwnedMessage::Close(None))
})
});
handle.spawn(f.map_err(move |e| println!("Err"))
.map(move |_| println!("Done")));
Ok(())
});
core.run(f).expect("somerror");
}
如评论中所述,stream
用作输入工作正常。使用rx
时,编译器会抱怨关于错误类型的类型不匹配(我相信):
error[E0271]: type mismatch resolving `<futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>> as futures::Sink>::SinkError == ()`
--> src/main.rs:47:26
|
47 | .forward(sink)
| ^^^^^^^ expected enum `websocket::WebSocketError`, found ()
|
= note: expected type `websocket::WebSocketError`
found type `()`
error[E0599]: no method named `map_err` found for type `futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>>` in the current scope
--> src/main.rs:48:26
|
48 | .map_err(|error| {
| ^^^^^^^
|
= note: the method `map_err` exists but the following trait bounds were not satisfied:
`futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>> : futures::Future`
这些是我的依赖项:
[dependencies]
websocket = "0.20.0"
futures = "0.1"
tokio-core = "0.1"
我在这里想念什么?