-1

我正在尝试在 Tokio 中构建一个回显服务器。我见过一些例子,但它们似乎都使用io::copy了 Tokio IO,我不能使用,因为我想修改输出。

但是,我无法编译同时使用writer和的服务器reader。我想构建一个基于期货的任务,该任务可以循环读取/写入(回显服务器)。

我的实际代码是这样的:

extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;

use futures::prelude::*;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use futures::Stream;
use futures::stream;
use tokio_io::codec::*;
use std::rc::Rc;

fn main() {
    let pool = CpuPool::new_num_cpus();
    use std::net::*;
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
    let listener = tokio::net::TcpListener::bind(&socket).unwrap();
    let server = listener.incoming().for_each(|socket| {
        let (writer, reader) = socket.framed(LinesCodec::new()).split();
        let writer = Rc::new(writer);
        let action = reader.for_each(|line| {
            println!("ECHO: {}", line);
            writer.send(line);
            Ok(())
        });
        pool.spawn(action); // std::rc::Rc<futures::stream::SplitSink<tokio_io::codec::Framed<tokio::net::TcpStream, tokio_io::codec::LinesCodec>>>` cannot be shared between threads safely
        Ok(())
    });
    server.wait().unwrap();
}

您可能会说我必须使用Arc,因为涉及不同的线程。我已经尝试过Arcand Mutex,但是出现了另一个错误,我想不出办法让它编译:

extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;

use futures::prelude::*;
use std::time;
use std::thread;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
use std::sync::Arc;
use std::sync::Mutex;

fn main() {
    let pool = CpuPool::new_num_cpus();
    use std::net::*;
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
    let listener = tokio::net::TcpListener::bind(&socket).unwrap();
    let server = listener.incoming().for_each(|socket| {
        let (writer, reader) = socket.framed(LinesCodec::new()).split();
        let writer = Arc::new(Mutex::new(writer));
        let action = reader.for_each(move |line| {
            println!("ECHO: {}", line);
            writer.lock().unwrap().send(line); // cannot move out of borrowed content
            Ok(())
        });
        pool.spawn(action);
        Ok(())
    });
    server.wait().unwrap();
}

它说的错误是:cannot move out of borrowed content

4

1 回答 1

1

我终于发现这forward就是我的问题的答案。

extern crate tokio;
extern crate tokio_io;
extern crate futures;

use futures::prelude::*;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;


struct Cancellable{
    rx: std::sync::mpsc::Receiver<()>,
}

impl Future for Cancellable {
    type Item = ();
    type Error = std::sync::mpsc::RecvError;

    fn poll(&mut self) -> Result<Async<Self::Item>,Self::Error> {
        match self.rx.try_recv() {
            Ok(_) => Ok(Async::Ready(())),
            Err(_) => Ok(Async::NotReady)
        }
    }
}

fn main() {
    use std::net::*;
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
    let listener = tokio::net::TcpListener::bind(&socket).unwrap();
    let server = listener.incoming().for_each(|socket|{
        let (writer,reader) = socket.framed(LinesCodec::new()).split();
        let (tx,rx) = std::sync::mpsc::channel();
        let cancel = Cancellable {
            rx: rx,
        };
        let action = reader
        .map(move |line|{
            println!("ECHO: {}",line);
            if line == "bye"{
                println!("BYE");
                tx.send(()).unwrap();
            }
            line
        })
        .forward(writer)
        .select2(cancel)
        .map(|_|{

        })
        .map_err(|err|{
            println!("error");
        });
        tokio::executor::current_thread::spawn(action);

        Ok(())
    }).map_err(|err|{
        println!("error = {:?}",err);
    });
    tokio::executor::current_thread::run(|_|{
        tokio::executor::current_thread::spawn(server);
    });
}
于 2018-02-27T22:37:16.417 回答