0

我有一个概念项目,其中客户端向服务器发送一个数字(PrimeClientRequest),服务器计算该值是否为素数,并返回一个响应(PrimeClientResponse)。我希望客户端是一个简单的 CLI,它提示用户输入一个数字,将请求发送到服务器,并显示响应。理想情况下,我想使用TcpClient来自 Tokio 和来自 Futures-Rs 的 Streams 来做到这一点。

我已经使用服务编写了一个 Tokio 服务器,我想为客户端重用codecproto

客户端的一部分是一个被调用的函数read_prompt,它返回一个Stream. 本质上,它是一个无限循环,每次迭代都会从stdin.

以下是相关代码:

main.rs

use futures::{Future, Stream};
use std::env;
use std::net::SocketAddr;
use tokio_core::reactor::Core;
use tokio_prime::protocol::PrimeClientProto;
use tokio_prime::request::PrimeRequest;
use tokio_proto::TcpClient;
use tokio_service::Service;

mod cli;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let addr_string = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let remote_addr = addr_string.parse::<SocketAddr>().unwrap();

    println!("Connecting on {}", remote_addr);
    let tcp_client = TcpClient::new(PrimeClientProto).connect(&remote_addr, &handle);

    core.run(tcp_client.and_then(|client| {
        client
            .call(PrimeRequest { number: Ok(0) })
            .and_then(|response| {
                println!("RESP = {:?}", response);
                Ok(())
            })
    })).unwrap();
}

cli.rs

use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{io, thread};
use std::io::{Stdin, Stdout};
use std::io::prelude::*;

pub fn read_prompt() -> impl Stream<Item = u64, Error = ()> {
    let (tx, rx) = mpsc::channel(1);

    thread::spawn(move || loop {
        let thread_tx = tx.clone();

        let input = prompt(io::stdout(), io::stdin()).unwrap();
        let parsed_input = input
            .parse::<u64>()
            .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid u64"));

        thread_tx.send(parsed_input.unwrap()).wait().unwrap();
    });

    rx
}

fn prompt(stdout: Stdout, stdin: Stdin) -> io::Result<String> {
    let mut stdout_handle = stdout.lock();
    stdout_handle.write(b"> ")?;
    stdout_handle.flush()?;

    let mut buf = String::new();
    let mut stdin_handle = stdin.lock();
    stdin_handle.read_line(&mut buf)?;

    Ok(buf.trim().to_string())
}

使用上面的代码,客户端在客户端终止之前向服务器发送一个请求。我希望能够使用生成的流向流中的每个项目read_prompt提供输入TcpClient并发出请求。我该怎么做呢?

完整的代码可以在joshleeb/tokio-prime找到。

4

1 回答 1

0

我(到目前为止)提出的解决方案是LoopFn在 Future-Rs 板条箱中使用。这并不理想,因为仍然需要建立新的连接,但这至少是朝着正确方向迈出的一步。

main.rs

use futures::{future, Future};
use std::{env, io};
use std::net::SocketAddr;
use tokio_core::reactor::{Core, Handle};
use tokio_prime::protocol::PrimeClientProto;
use tokio_prime::request::PrimeRequest;
use tokio_proto::TcpClient;
use tokio_service::Service;

mod cli;

fn handler<'a>(
    handle: &'a Handle, addr: &'a SocketAddr
) -> impl Future<Item = (), Error = ()> + 'a {
    cli::prompt(io::stdin(), io::stdout())
        .and_then(move |number| {
            TcpClient::new(PrimeClientProto)
                .connect(addr, handle)
                .and_then(move |client| Ok((client, number)))
        })
        .and_then(|(client, number)| {
            client
                .call(PrimeRequest { number: Ok(number) })
                .and_then(|response| {
                    println!("{:?}", response);
                    Ok(())
                })
        })
        .or_else(|err| {
            println!("! {}", err);
            Ok(())
        })
}

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let addr_string = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let remote_addr = addr_string.parse::<SocketAddr>().unwrap();

    println!("Connecting on {}", remote_addr);

    let client = future::loop_fn((), |_| {
        handler(&handle, &remote_addr)
            .map(|_| -> future::Loop<(), ()> { future::Loop::Continue(()) })
    });

    core.run(client).ok();
}

cli.rs

use futures::prelude::*;
use std::io;
use std::io::{Stdin, Stdout};
use std::io::prelude::*;

#[async]
pub fn prompt(stdin: Stdin, stdout: Stdout) -> io::Result<u64> {
    let mut stdout_handle = stdout.lock();
    stdout_handle.write(b"> ")?;
    stdout_handle.flush()?;

    let mut buf = String::new();
    let mut stdin_handle = stdin.lock();
    stdin_handle.read_line(&mut buf)?;

    parse_input(buf.trim().to_string())
}

fn parse_input(s: String) -> io::Result<u64> {
    s.parse::<u64>()
        .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid u64"))
}
于 2018-02-04T07:02:55.910 回答