5

我有一个 5GB JSON 文件,它是一组具有固定结构的对象:

[
  {
    "first": "John",
    "last": "Doe",
    "email": "john.doe@yahoo.com"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "anne.ortha@hotmail.com"
  },
  ....
]

我知道我可以尝试使用如何使用 Serde 使用顶级数组反序列化 JSON?

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let users: Vec<User> = serde_json::from_str(file)?;

有多个问题:

  1. 首先将其作为一个整体读取
  2. 读取为字符串后,它将其转换为User结构向量(我不想要那个)

我尝试了如何从 Rust 中的文件/流中懒惰地读取多个 JSON 值?但它会在打印任何内容之前读取整个文件,并在循环内一次打印整个结构。我期待在循环中一次一个对象:

在此处输入图像描述

理想情况下,(解析的)用户对象的解析和处理应该在两个单独的线程/任务/例程中同时发生或通过使用通道发生。

4

2 回答 2

1

从 JSON 数组流式传输元素是可能的,但需要一些繁琐的工作。您必须自己跳过领先[和间歇,,以及检测最后的]. 要解析单个数组元素,您需要使用StreamDeserializer并从中提取单个项目(这样您就可以删除它并重新获得对 IO 读取器的控制权)。例如:

use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};

fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
    loop {
        let mut byte = 0u8;
        reader.read_exact(std::slice::from_mut(&mut byte))?;
        if !byte.is_ascii_whitespace() {
            return Ok(byte);
        }
    }
}

fn invalid_data(msg: &str) -> io::Error {
    io::Error::new(io::ErrorKind::InvalidData, msg)
}

fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
    let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
    match next_obj {
        Some(result) => result.map_err(Into::into),
        None => Err(invalid_data("premature EOF")),
    }
}

fn yield_next_obj<T: DeserializeOwned, R: Read>(
    mut reader: R,
    at_start: &mut bool,
) -> io::Result<Option<T>> {
    if !*at_start {
        *at_start = true;
        if read_skipping_ws(&mut reader)? == b'[' {
            // read the next char to see if the array is empty
            let peek = read_skipping_ws(&mut reader)?;
            if peek == b']' {
                Ok(None)
            } else {
                deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
            }
        } else {
            Err(invalid_data("`[` not found"))
        }
    } else {
        match read_skipping_ws(&mut reader)? {
            b',' => deserialize_single(reader).map(Some),
            b']' => Ok(None),
            _ => Err(invalid_data("`,` or `]` not found")),
        }
    }
}

pub fn iter_json_array<T: DeserializeOwned, R: Read>(
    mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
    let mut at_start = false;
    std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}

示例用法:

fn main() {
    let data = r#"[
  {
    "first": "John",
    "last": "Doe",
    "email": "john.doe@yahoo.com"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "anne.ortha@hotmail.com"
  }
]"#;
    use serde::{Deserialize, Serialize};

    #[derive(Serialize, Deserialize, Debug)]
    struct User {
        first: String,
        last: String,
        email: String,
    }

    for user in iter_json_array(io::Cursor::new(&data)) {
        let user: User = user.unwrap();
        println!("{:?}", user);
    }
}

操场

在生产中使用它时,您会打开它File而不是将其读取到字符串中。与往常一样,不要忘记FileBufReader.

于 2021-08-03T21:52:05.210 回答
1

从 serde_json 1.0.66 开始,这不可能直接实现。

建议的一种解决方法是实现您自己Visitor的使用通道的方法。随着数组反序列化的进行,每个元素都被推下通道。然后通道的接收端可以抓取每个元素并对其进行处理,从而为反序列化腾出空间以推入另一个值。

于 2021-08-03T20:37:53.883 回答