0

我正在尝试创建一个基本的 tcp 服务器:

  1. 服务器应该能够向所有连接的客户端广播消息流
  2. 服务器应该能够接收来自所有客户端的命令并处理它们

这就是我的main功能:

let (server_tx, server_rx) = mpsc::unbounded();
let state = Arc::new(Mutex::new(Shared::new(server_tx)));

let addr = "127.0.0.1:6142".parse().unwrap();

let listener = TcpListener::bind(&addr).unwrap();

let server = listener.incoming().for_each(move |socket| {
    // Spawn a task to process the connection
    process(socket, state.clone());
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

println!("server running on localhost:6142");

let _messages = server_rx.for_each(|_| {
    // process messages here
    Ok(())
}).map_err(|err| {
    println!("message error = {:?}", err);
});

tokio::run(server);  

操场

我使用chat.rs来自 tokio 存储库的示例作为基础。
我正在向server_tx传入的 tcp 消息发送数据。
我遇到的麻烦是消费它们。
我正在使用“消费”传入消息流server_rx.for_each(|_| {,现在,我如何告诉 tokio 运行它?

tokio::run接受一个未来,但我有 2 个(可能更多)。我如何组合它们以使它们并行运行?

4

1 回答 1

4

一起加入期货:

let messages = server_rx.for_each(|_| {
    println!("Message broadcasted");
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

tokio::run(server.join(messages).map(|_| ()));

需要组合器map(),因为关联类型是一个元组并 使用需要一个类型的未来任务Join Item((), ())tokio::run()Future::Item()

于 2018-10-29T08:40:07.100 回答