3

我之前问过 我的最终解决方案是在无限循环中将客户端异步重新连接到服务器。这是我问题的第一部分,我无法解决第二部分。

我有一个无界通道,我需要将此通道接收到的数据流式传输到远程服务器。如果与远程服务器的连接丢失,那么我将重新启动它并重试来自同一通道的流数据。

我想到的解决方案是把 传递UnboundedReceiver给尝试连接服务器的方法,在这个方法中转发数据,UnboundedReceiver连接丢失时返回接收到的,这样死循环就可以重新取得 的所有权UnboundedReceiver并传递它再次循环方法。

如何UnboundedReceiver从我的send_data_to_remote_server()方法中返回它?

这是一个代码示例:

extern crate futures;
extern crate tokio_core;
extern crate tokio_line;

use futures::future::{self, Future, Loop};
use futures::{Stream, Sink};
use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use tokio_core::io::{Io};
use tokio_core::net::{TcpStream};
use tokio_core::reactor::{Core, Handle};
use tokio_line::LineCodec;
use std::{io, str};
use std::{thread, time};


fn send_data_to_remote_server(handle: &Handle, bufrx: UnboundedReceiver<String>) -> Box<Future<Item = UnboundedReceiver<String>, Error = io::Error>> {
    let remote_addr = "127.0.0.1:9876".parse().unwrap();
    let tcp = TcpStream::connect(&remote_addr, handle);

    let client = tcp.and_then(move |stream| {
        let (sender, receiver) = stream.framed(LineCodec).split();
        let reader = receiver
            .for_each(|message| {
                println!("{}", message);
                Ok(())
            })
            .and_then(|bufrx| {
                println!("CLIENT DISCONNECTED");
                /***************** START FIXME ************/
                let (_, bufrx) = mpsc::unbounded(); //FIXME: `bufrx` received in a clousure is actually an `()` so I can't use it, I put this here only as sample and to satisfy the compiler
                /***************** END FIXME ************/
                Ok(bufrx)
            });

        let writer = bufrx
            .map_err(|_| io::Error::new(io::ErrorKind::Other, "error kind returned should be the same of `sender` sink in `forward()`"))
            .forward(sender)
            .and_then(|(bufrx, sender)| {
                // bufrx
                /***************** START FIXME ************/
                let (_, bufrx) = mpsc::unbounded(); //FIXME: `bufrx` received in a clousure is actually an `()` so I can't use it, I put this here only as sample and to satisfy the compiler
                /***************** END FIXME ************/
                Ok(bufrx)
            });

        reader.select(writer)
            .map(|(bufrx, nf)| {
                bufrx
            })
            .map_err(|(err, nf)| {
                err
            })

    }).or_else(|_| {
        println!("connection refuse");
        Err(io::Error::new(io::ErrorKind::Other, "connection refuse"))
    });

    Box::new(client)
}

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let (buftx, bufrx) = mpsc::unbounded();

    let client = future::loop_fn(bufrx, |bufrx| {
        send_data_to_remote_server(&handle, bufrx)
            .map(|bufrx| -> Loop<UnboundedReceiver<String>, UnboundedReceiver<String>> {
                /***************** START FIXME ************/
                // actually the `bufrx` received in this clouser is not the original one, but I
                // think that the fix should be made in the `send_data_to_remote_server()` method
                /***************** END FIXME ************/
                Loop::Continue(bufrx)
            })
            .or_else(|err| -> Result<Loop<UnboundedReceiver<String>, UnboundedReceiver<String>>, ()> {
                thread::sleep(time::Duration::from_millis(50));
                /***************** START FIXME ************/
                let (_, bufrx) = mpsc::unbounded(); //FIXME: argument received in a clousure is actually an `io::Error` and I lost the ownership of the original bufrx so I can't use it, I put this here only as sa↪mple and to satisfy the compiler
                /***************** END FIXME ************/

                Ok(Loop::Continue(bufrx))
            })
    });

    core.run(client).unwrap();
}

添加 tokio-line 板条箱:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
4

0 回答 0