我正在尝试同时处理 Tokio 中到达的 UDP 数据包。但是,以下 MWE 并没有达到我的预期:
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::{Future, Stream};
use std::net::SocketAddr;
use tokio_core::net::{UdpCodec, UdpSocket};
use tokio_core::reactor::Core;
// just a codec to send and receive bytes
pub struct LineCodec;
impl UdpCodec for LineCodec {
type In = (SocketAddr, Vec<u8>);
type Out = (SocketAddr, Vec<u8>);
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> std::io::Result<Self::In> {
Ok((*addr, buf.to_vec()))
}
fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
into.extend(buf);
addr
}
}
fn compute(addr: SocketAddr, msg: Vec<u8>) -> Box<Future<Item = (), Error = ()>> {
println!("Starting to compute for: {}", addr);
// sleep is a placeholder for a long computation
std::thread::sleep(std::time::Duration::from_secs(8));
println!("Done computing for for: {}", addr);
Box::new(futures::future::ok(()))
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let listening_addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
let socket = UdpSocket::bind(&listening_addr, &handle).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
let (writer, reader) = socket.framed(LineCodec).split();
let socket_read = reader.for_each(|(addr, msg)| {
println!("Got {:?}", msg);
handle.spawn(compute(addr, msg));
Ok(())
});
core.run(socket_read).unwrap();
}
连接两个终端$ nc -u localhost 8080
并发送一些文本,我可以看到来自第二个终端的消息在第一个终端完成后处理。
我必须改变什么?