1

问题描述

使用serde_json将很长的对象数组反序列化为 aVec<T>可能需要很长时间,因为必须预先将整个数组读入内存。我想迭代数组中的项目,以避免预先处理和内存要求。

到目前为止我的方法

StreamDeserializer不能直接使用,因为它只能迭代背靠背放置的自定界类型。所以到目前为止我所做的是编写一个自定义结构来实现Read,包装另一个Read但省略开始和结束方括号以及任何逗号。

例如,阅读器会将 JSON[{"name": "foo"}, {"name": "bar"}, {"name": "baz"}]转换为{"name": "foo"} {"name": "bar"} {"name": "baz"}可以与StreamDeserializer.

这是完整的代码:

use std::io;

/// An implementation of `Read` that transforms JSON input where the outermost
/// structure is an array. The enclosing brackets and commas are removed,
/// causing the items to be adjacent to one another. This works with
/// [`serde_json::StreamDeserializer`].
pub(crate) struct ArrayStreamReader<T> {
    inner: T,
    depth: Option<usize>,
    inside_string: bool,
    escape_next: bool,
}

impl<T: io::Read> ArrayStreamReader<T> {
    pub(crate) fn new_buffered(inner: T) -> io::BufReader<Self> {
        io::BufReader::new(ArrayStreamReader {
            inner,
            depth: None,
            inside_string: false,
            escape_next: false,
        })
    }
}

#[inline]
fn do_copy(dst: &mut [u8], src: &[u8], len: usize) {
    if len == 1 {
        dst[0] = src[0]; // Avoids memcpy call.
    } else {
        dst[..len].copy_from_slice(&src[..len]);
    }
}

impl<T: io::Read> io::Read for ArrayStreamReader<T> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if buf.is_empty() {
            return Ok(0);
        }

        let mut tmp = vec![0u8; buf.len()];

        // The outer loop is here in case every byte was skipped, which can happen
        // easily if `buf.len()` is 1. In this situation, the operation is retried
        // until either no bytes are read from the inner stream, or at least 1 byte
        // is written to `buf`.
        loop {
            let byte_count = self.inner.read(&mut tmp)?;
            if byte_count == 0 {
                return if self.depth.is_some() {
                    Err(io::ErrorKind::UnexpectedEof.into())
                } else {
                    Ok(0)
                };
            }

            let mut tmp_pos = 0;
            let mut buf_pos = 0;
            for (i, b) in tmp.iter().cloned().enumerate() {
                if self.depth.is_none() {
                    match b {
                        b'[' => {
                            tmp_pos = i + 1;
                            self.depth = Some(0);
                        },
                        b if b.is_ascii_whitespace() => {},
                        b'\0' => break,
                        _ => return Err(io::ErrorKind::InvalidData.into()),
                    }
                    continue;
                }

                if self.inside_string {
                    match b {
                        _ if self.escape_next => self.escape_next = false,
                        b'\\' => self.escape_next = true,
                        b'"' if !self.escape_next => self.inside_string = false,
                        _ => {},
                    }
                    continue;
                }

                let depth = self.depth.unwrap();
                match b {
                    b'[' | b'{' => self.depth = Some(depth + 1),
                    b']' | b'}' if depth > 0 => self.depth = Some(depth - 1),
                    b'"' => self.inside_string = true,
                    b'}' if depth == 0 => return Err(io::ErrorKind::InvalidData.into()),
                    b',' | b']' if depth == 0 => {
                        let len = i - tmp_pos;
                        do_copy(&mut buf[buf_pos..], &tmp[tmp_pos..], len);
                        tmp_pos = i + 1;
                        buf_pos += len;

                        // Then write a space to separate items.
                        buf[buf_pos] = b' ';
                        buf_pos += 1;

                        if b == b']' {
                            // Reached the end of outer array. If another array
                            // follows, the stream will continue.
                            self.depth = None;
                        }
                    },
                    _ => {},
                }
            }

            if tmp_pos < byte_count {
                let len = byte_count - tmp_pos;
                do_copy(&mut buf[buf_pos..], &tmp[tmp_pos..], len);
                buf_pos += len;
            }

            if buf_pos > 0 {
                // If at least some data was read, return with the amount. Otherwise, the outer
                // loop will try again.
                return Ok(buf_pos);
            }
        }
    }
}

它是这样使用的:

use std::io;

use serde::Deserialize;

#[derive(Deserialize)]
struct Item {
    name: String,
}

fn main() -> io::Result<()> {
    let json = br#"[{"name": "foo"}, {"name": "bar"}]"#;
    let wrapped = ArrayStreamReader::new_buffered(&json[..]);
    let first_item: Item = serde_json::Deserializer::from_reader(wrapped)
        .into_iter()
        .next()
        .unwrap()?;
    assert_eq!(first_item.name, "foo");
    Ok(())
}

最后,一个问题

必须有更好的方法来做到这一点,对吧?

4

0 回答 0