我之前问过 我的最终解决方案是在无限循环中将客户端异步重新连接到服务器。这是我问题的第一部分,我无法解决第二部分。
我有一个无界通道,我需要将此通道接收到的数据流式传输到远程服务器。如果与远程服务器的连接丢失,那么我将重新启动它并重试来自同一通道的流数据。
我想到的解决方案是把 传递UnboundedReceiver
给尝试连接服务器的方法,在这个方法中转发数据,UnboundedReceiver
连接丢失时返回接收到的,这样死循环就可以重新取得 的所有权UnboundedReceiver
并传递它再次循环方法。
如何UnboundedReceiver
从我的send_data_to_remote_server()
方法中返回它?
这是一个代码示例:
extern crate futures;
extern crate tokio_core;
extern crate tokio_line;
use futures::future::{self, Future, Loop};
use futures::{Stream, Sink};
use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use tokio_core::io::{Io};
use tokio_core::net::{TcpStream};
use tokio_core::reactor::{Core, Handle};
use tokio_line::LineCodec;
use std::{io, str};
use std::{thread, time};
fn send_data_to_remote_server(handle: &Handle, bufrx: UnboundedReceiver<String>) -> Box<Future<Item = UnboundedReceiver<String>, Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let client = tcp.and_then(move |stream| {
let (sender, receiver) = stream.framed(LineCodec).split();
let reader = receiver
.for_each(|message| {
println!("{}", message);
Ok(())
})
.and_then(|bufrx| {
println!("CLIENT DISCONNECTED");
/***************** START FIXME ************/
let (_, bufrx) = mpsc::unbounded(); //FIXME: `bufrx` received in a clousure is actually an `()` so I can't use it, I put this here only as sample and to satisfy the compiler
/***************** END FIXME ************/
Ok(bufrx)
});
let writer = bufrx
.map_err(|_| io::Error::new(io::ErrorKind::Other, "error kind returned should be the same of `sender` sink in `forward()`"))
.forward(sender)
.and_then(|(bufrx, sender)| {
// bufrx
/***************** START FIXME ************/
let (_, bufrx) = mpsc::unbounded(); //FIXME: `bufrx` received in a clousure is actually an `()` so I can't use it, I put this here only as sample and to satisfy the compiler
/***************** END FIXME ************/
Ok(bufrx)
});
reader.select(writer)
.map(|(bufrx, nf)| {
bufrx
})
.map_err(|(err, nf)| {
err
})
}).or_else(|_| {
println!("connection refuse");
Err(io::Error::new(io::ErrorKind::Other, "connection refuse"))
});
Box::new(client)
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let (buftx, bufrx) = mpsc::unbounded();
let client = future::loop_fn(bufrx, |bufrx| {
send_data_to_remote_server(&handle, bufrx)
.map(|bufrx| -> Loop<UnboundedReceiver<String>, UnboundedReceiver<String>> {
/***************** START FIXME ************/
// actually the `bufrx` received in this clouser is not the original one, but I
// think that the fix should be made in the `send_data_to_remote_server()` method
/***************** END FIXME ************/
Loop::Continue(bufrx)
})
.or_else(|err| -> Result<Loop<UnboundedReceiver<String>, UnboundedReceiver<String>>, ()> {
thread::sleep(time::Duration::from_millis(50));
/***************** START FIXME ************/
let (_, bufrx) = mpsc::unbounded(); //FIXME: argument received in a clousure is actually an `io::Error` and I lost the ownership of the original bufrx so I can't use it, I put this here only as sa↪mple and to satisfy the compiler
/***************** END FIXME ************/
Ok(Loop::Continue(bufrx))
})
});
core.run(client).unwrap();
}
添加 tokio-line 板条箱:
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }