3

我正在使用 tokio 来实现一个服务器,该服务器与使用 serde(bincode)序列化的消息进行通信。如果没有异步和期货,我会做

extern crate tokio_io;
extern crate bincode;
extern crate serde;
extern crate bytes;
extern crate futures;
#[macro_use] extern crate serde_derive;

use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use bincode::{serialize, deserialize, deserialize_from, Infinite, serialized_size};
use std::io::Read;
use std::io::Cursor;
use futures::future::Future;

type Item = String; // Dummy, this is a complex struct with derived Serizalize
type Error = bincode::Error;

// This works
fn decode<R>(reader: &mut R) -> Result<Item, Error> where R: Read {
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

fn main() {

    let ser = serialize("Test", Infinite).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);

    println!("{:?}", decode(&mut reader))
}

但我需要的是一个decode可以与异步套接字一起使用的函数

// I need this since I get the reader from a (tokio) socket as
// let socket = TcpListener::bind(&addr, &handle).unwrap();
// let (reader, writer) = socket.split();
fn decode_async<R>(reader: R) -> Result<Item, Error> where R: AsyncRead {
    // Does not work:   
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

我唯一的想法是在编码期间手动将长度写入缓冲区,然后使用 read_exact:

// Encode with size
fn encode_async(item: &Item) -> Result<Vec<u8>, Error>{
    let size = serialized_size(item);
    let mut buf = serialize(&size, Infinite).unwrap();
    let ser = serialize(item, Infinite).unwrap();
    buf.extend(ser);
    Ok(buf)
}

// Decode with size
fn decode_async<R>(reader: R) -> Box<Future<Item = Item, Error = std::io::Error>>
    where R: AsyncRead + 'static {

    let read = read_exact(reader, vec![0u8; 8]).and_then(|(reader, buf)| {
        let size = deserialize::<u64>(&mut &buf[..]).unwrap();
        Ok((reader, size as usize))
    }).and_then(|(reader, size)| {
        read_exact(reader, vec![0u8; size])
    }).and_then(|(reader, buf)| {
        let item = deserialize(&mut &buf[..]).unwrap();
        Ok(item)
    });

    Box::new(read)
}

fn main() {

    let ser = encode_async(&String::from("Test")).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);
    let dec = decode_async(reader).wait();

    println!("{:?}", dec)
}

有没有更好的方法来实现解码?

4

2 回答 2

4

deserialize_from无法处理 IO 错误,尤其WouldBlock是异步(非阻塞)Readers 在等待更多数据时返回的错误。这受到接口的限制:deserialize_from不返回一个Future或部分状态,它返回完整的解码Result并且不知道如何将Reader 与事件循环结合起来处理WouldBlock而不忙循环。

理论上,可以实现async_deserialize_from,但不能使用提供的接口,serde除非您提前读取完整数据以进行解码,否则会破坏目的。

tokio_io::io::read_to_end如果您知道“无休止”流(或后跟其他数据的流)中编码数据的大小,则需要使用或tokio_io::io::read_exact(您当前正在使用的内容)读取完整数据。

于 2018-02-01T11:58:07.567 回答
0

Stefan 的回答是正确的,但是您可能有兴趣查看为您执行此操作的 tokio-serde-* 板条箱系列,特别是tokio-serde-bincode。从自述文件:

使用 serde 轻松实现 Tokio Bincode 传输所需的实用程序,用于帧值的序列化和反序列化。基于 tokio-serde。

crate 有几个如何使用它的示例。

于 2019-06-10T20:45:13.017 回答