24

I'm not able to create a client that tries to connect to a server and:

  • if the server is down it has to try again in an infinite loop
  • if the server is up and connection is successful, when the connection is lost (i.e. server disconnects the client) the client has to restart the infinite loop to try to connect to the server

Here's the code to connect to a server; currently when the connection is lost the program exits. I'm not sure what the best way to implement it is; maybe I have to create a Future with an infinite loop?

extern crate tokio_line;
use tokio_line::LineCodec;

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                   
    let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                            
    let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                             

    let client = tcp.and_then(|stream| {                                                                                                                                                                            
        let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                 
        let reader = from_server.for_each(|message| {                                                                                                                                                               
            println!("{}", message);                                                                                                                                                                                
            Ok(())                                                                                                                                                                                                  
        });                                                                                                                                                                                                         

        reader.map(|_| {                                                                                                                                                                                            
            println!("CLIENT DISCONNECTED");                                                                                                                                                                        
            ()                                                                                                                                                                                                      
        }).map_err(|err| err)                                                                                                                                                                                       
    });                                                                                                                                                                                                             

    let client = client.map_err(|_| { panic!()});                                                                                                                                                                   
    Box::new(client)                                                                                                                                                                                                
}                                                                                                                                                                                                                   

fn main() {                                                                                                                                                                                                         
    let mut core = Core::new().unwrap();                                                                                                                                                                            
    let handle = core.handle();                                                                                                                                                                                     
    let client = get_connection(&handle);                                                                                                                                                                           

    let client = client.and_then(|c| {                                                                                                                                                                              
        println!("Try to reconnect");                                                                                                                                                                               
        get_connection(&handle);                                                                                                                                                                                    
        Ok(())                                                                                                                                                                                                      
    });                                                                                                                                                                                                             

    core.run(client).unwrap();                                                                                                                                                                                      
}

Add the tokio-line crate with:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
4

1 回答 1

19

关键问题似乎是:如何使用 Tokio 实现无限循环?通过回答这个问题,我们可以解决断开时无限重新连接的问题。根据我编写异步代码的经验,递归似乎是解决这个问题的直接方法。

更新:正如 Shepmaster(以及 Tokio Gitter 的人)所指出的,我最初的答案会泄漏内存,因为我们构建了一个在每次迭代中增长的期货链。下面是一个新的:

更新答案:使用loop_fn

板条箱中有一个功能futures可以完全满足您的需求。它被称为loop_fn。您可以通过将主要功能更改为以下内容来使用它:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = future::loop_fn((), |_| {
        // Run the get_connection function and loop again regardless of its result
        get_connection(&handle).map(|_| -> Loop<(), ()> {
            Loop::Continue(())
        })
    });

    core.run(client).unwrap();
}

该函数类似于一个 for 循环,它可以根据结果继续或中断get_connection(请参阅Loop枚举的文档)。在这种情况下,我们选择始终继续,因此它将无限地保持重新连接。

请注意,get_connection如果出现错误(例如,如果客户端无法连接到服务器),您的版本将会恐慌。如果您还想在出错后重试,则应删除对panic!.


旧答案:使用递归

以下是我的旧答案,以防有人觉得有趣。

警告:使用下面的代码会导致内存无限增长。

无限get_connection循环

我们想在get_connection每次客户端断开连接时调用该函数,所以这正是我们要做的(看后面的注释reader.and_then):

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
    let remote_addr = "127.0.0.1:9876".parse().unwrap();
    let tcp = TcpStream::connect(&remote_addr, handle);
    let handle_clone = handle.clone();

    let client = tcp.and_then(|stream| {
        let (sink, from_server) = stream.framed(LineCodec).split();
        let reader = from_server.for_each(|message| {
            println!("{}", message);
            Ok(())
        });

        reader.and_then(move |_| {
            println!("CLIENT DISCONNECTED");
            // Attempt to reconnect in the future
            get_connection(&handle_clone)
        })
    });

    let client = client.map_err(|_| { panic!()});
    Box::new(client)
}

请记住,这get_connection是非阻塞的。它只是构造一个Box<Future>. 这意味着当递归调用它时,我们仍然不会阻塞。相反,我们得到了一个新的未来,我们可以使用and_then. 如您所见,这与正常递归不同,因为堆栈不会在每次迭代时增长。

请注意,我们需要克隆handle(参见handle_clone),并将其移动到传递给的闭包中reader.and_then。这是必要的,因为闭包的寿命将比函数长(它将包含在我们返回的未来)。

处理错误

您提供的代码无法处理客户端无法连接到服务器的情况(也没有任何其他错误)。按照上面所示的相同原则,我们可以通过将结尾更改为get_connection以下内容来处理错误:

let handle_clone = handle.clone();
let client = client.or_else(move |err| {
    // Note: this code will infinitely retry, but you could pattern match on the error
    // to retry only on certain kinds of error
    println!("Error connecting to server: {}", err);
    get_connection(&handle_clone)
});
Box::new(client)

请注意,or_else它类似于and_then,但它对未来产生的错误进行操作。

删除不必要的代码main

最后,没有必要and_thenmain函数中使用。您可以用main以下代码替换您的:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = get_connection(&handle);
    core.run(client).unwrap();
}
于 2017-02-08T09:54:52.730 回答