我有一个概念项目,其中客户端向服务器发送一个数字(PrimeClientRequest
),服务器计算该值是否为素数,并返回一个响应(PrimeClientResponse
)。我希望客户端是一个简单的 CLI,它提示用户输入一个数字,将请求发送到服务器,并显示响应。理想情况下,我想使用TcpClient
来自 Tokio 和来自 Futures-Rs 的 Streams 来做到这一点。
我已经使用服务编写了一个 Tokio 服务器,我想为客户端重用codec
它proto
。
客户端的一部分是一个被调用的函数read_prompt
,它返回一个Stream
. 本质上,它是一个无限循环,每次迭代都会从stdin
.
以下是相关代码:
main.rs
use futures::{Future, Stream};
use std::env;
use std::net::SocketAddr;
use tokio_core::reactor::Core;
use tokio_prime::protocol::PrimeClientProto;
use tokio_prime::request::PrimeRequest;
use tokio_proto::TcpClient;
use tokio_service::Service;
mod cli;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr_string = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let remote_addr = addr_string.parse::<SocketAddr>().unwrap();
println!("Connecting on {}", remote_addr);
let tcp_client = TcpClient::new(PrimeClientProto).connect(&remote_addr, &handle);
core.run(tcp_client.and_then(|client| {
client
.call(PrimeRequest { number: Ok(0) })
.and_then(|response| {
println!("RESP = {:?}", response);
Ok(())
})
})).unwrap();
}
cli.rs
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{io, thread};
use std::io::{Stdin, Stdout};
use std::io::prelude::*;
pub fn read_prompt() -> impl Stream<Item = u64, Error = ()> {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || loop {
let thread_tx = tx.clone();
let input = prompt(io::stdout(), io::stdin()).unwrap();
let parsed_input = input
.parse::<u64>()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid u64"));
thread_tx.send(parsed_input.unwrap()).wait().unwrap();
});
rx
}
fn prompt(stdout: Stdout, stdin: Stdin) -> io::Result<String> {
let mut stdout_handle = stdout.lock();
stdout_handle.write(b"> ")?;
stdout_handle.flush()?;
let mut buf = String::new();
let mut stdin_handle = stdin.lock();
stdin_handle.read_line(&mut buf)?;
Ok(buf.trim().to_string())
}
使用上面的代码,客户端在客户端终止之前向服务器发送一个请求。我希望能够使用生成的流向流中的每个项目read_prompt
提供输入TcpClient
并发出请求。我该怎么做呢?
完整的代码可以在joshleeb/tokio-prime找到。