16

假设我请求一个包含许多对象列表的大型 json 文件。我不希望它们一次全部进入内存,但我宁愿一个一个地阅读和处理它们。所以我需要将异步System.IO.Stream流转换为IAsyncEnumerable<T>. 如何使用新的System.Text.JsonAPI 来执行此操作?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}
4

6 回答 6

15

TL;DR这不是微不足道的


看起来有人已经 发布了一个结构的完整代码Utf8JsonStreamReader,该结构从流中读取缓冲区并将它们提供给 Utf8JsonRreader,从而允许使用JsonSerializer.Deserialize<T>(ref newJsonReader, options);. 代码也不是微不足道的。相关问题在这里,答案在这里

但这还不够——HttpClient.GetAsync只有在收到整个响应后才会返回,基本上缓冲内存中的所有内容。

为避免这种情况,应将HttpClient.GetAsync(string,HttpCompletionOption )HttpCompletionOption.ResponseHeadersRead与.

反序列化循环也应该检查取消令牌,如果收到信号则退出或抛出。否则循环将继续,直到整个流被接收和处理。

此代码基于相关答案的示例,并使用HttpCompletionOption.ResponseHeadersRead和检查取消令牌。它可以解析包含适当项目数组的 JSON 字符串,例如:

[{"prop1":123},{"prop1":234}]

第一次调用jsonStreamReader.Read()移动到数组的开头,而第二次调用移动到第一个对象的开头。]当检测到数组 ( )的末尾时,循环本身终止。

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

JSON 片段,AKA 流式 JSON 又名 ...*

在事件流或日志记录场景中,将单个 JSON 对象附加到文件中是很常见的,每行一个元素,例如:

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

这不是一个有效的 JSON文档,但各个片段是有效的。这对于大数据/高并发场景有几个优势。添加新事件只需要在文件中追加一个新行,而不是解析和重建整个文件。处理,尤其是并行处理更容易,原因有二:

  • 可以一次检索单个元素,只需从流中读取一行即可。
  • 输入文件可以很容易地跨行边界进行分区和拆分,将每个部分提供给单独的工作进程,例如在 Hadoop 集群中,或者只是应用程序中的不同线程:计算分割点,例如通过将长度除以工作人员的数量,然后查找第一个换行符。将所有内容提供给单独的工人。

使用 StreamReader

执行此操作的 allocate-y 方法是使用 TextReader,一次读取一行并使用JsonSerializer.Deserialize对其进行解析:

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

这比反序列化适当数组的代码要简单得多。有两个问题:

  • ReadLineAsync不接受取消令牌
  • 每次迭代都会分配一个新字符串,这是我们希望通过使用 System.Text.Json避免的事情之一

这可能就足够了,因为尝试生成ReadOnlySpan<Byte>JsonSerializer.Deserialize 所需的缓冲区并非易事。

管道和序列读取器

为了避免分配,我们需要ReadOnlySpan<byte>从流中获取一个。这样做需要使用 System.IO.Pipeline 管道和SequenceReader结构。Steve Gordon 的An Introduction to SequenceReader解释了如何使用此类从使用分隔符的流中读取数据。

不幸的是,SequenceReader是一个 ref 结构,这意味着它不能在异步或本地方法中使用。这就是为什么史蒂夫戈登在他的文章中创建了一个

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

方法从 ReadOnlySequence 读取项目并返回结束位置,因此 PipeReader 可以从中恢复。不幸的是,我们想要返回一个 IEnumerable 或 IAsyncEnumerable,而迭代器方法也不喜欢inout参数。

我们可以在 List 或 Queue 中收集反序列化的项目并将它们作为单个结果返回,但这仍然会分配列表、缓冲区或节点,并且必须等待缓冲区中的所有项目在返回之前被反序列化:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

我们需要一些不需要迭代器方法的可枚举的东西,与异步一起工作并且不会以这种方式缓冲所有内容。

添加通道以生成 IAsyncEnumerable

ChannelReader.ReadAllAsync返回一个 IAsyncEnumerable。我们可以从不能用作迭代器的方法中返回 ChannelReader,并且仍然可以在没有缓存的情况下生成元素流。

调整 Steve Gordon 的代码以使用通道,我们得到 ReadItems(ChannelWriter...) 和ReadLastItem方法。第一个,一次读取一个项目,使用ReadOnlySpan<byte> itemBytes. 这可以被JsonSerializer.Deserialize. 如果ReadItems找不到分隔符,则返回其位置,以便 PipelineReader 可以从流中提取下一个块。

当我们到达最后一个块并且没有其他分隔符时,ReadLastItem` 读取剩余的字节并反序列化它们。

该代码几乎与 Steve Gordon 的相同。我们没有写入控制台,而是写入 ChannelWriter。

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

DeserializeToChannel<T>方法在流顶部创建一个管道读取器,创建一个通道并启动一个工作任务,该任务解析块并将它们推送到通道:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync()可用于通过以下方式消耗所有物品IAsyncEnumerable<T>

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
于 2019-10-30T08:45:08.237 回答
6

是的,在很多地方,真正的流式 JSON(反)序列化器将是一个很好的性能改进。

不幸的是,System.Text.Json在我写这篇文章的时候并没有这样做。我不确定将来是否会 - 我希望如此!JSON 的真正流式反序列化被证明是相当具有挑战性的。

也许您可以检查极快的Utf8Json 是否支持它。

但是,可能有针对您的特定情况的自定义解决方案,因为您的要求似乎限制了难度。

这个想法是一次手动从数组中读取一个项目。我们正在利用列表中的每个项目本身就是一个有效的 JSON 对象这一事实。

您可以手动跳过[(对于第一个项目)或,(对于每个下一个项目)。然后我认为你最好的选择是使用 .NET CoreUtf8JsonReader来确定当前对象的结束位置,并将扫描的字节提供给JsonDeserializer.

这样,您一次只能稍微缓冲一个对象。

而且由于我们正在谈论性能,因此您可以在使用PipeReader时从 a 中获取输入。:-)

于 2019-10-29T14:30:20.440 回答
4

我知道这是一篇旧帖子,但最近System.Text.Json support for IAsyncEnumerable在 .Net 6 Preview 4 中宣布的内容提供了 OP 中提到的问题的解决方案。

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {

            await foreach(var item in JsonSerializer.DeserializeAsyncEnumerable<T>(stream))
            {
                yield return item;
            }
        }
    }
}

这将提供按需反序列化,并且在处理大数据时非常有用。请注意,目前该功能仅限于根级 JSON 数组。

可以在此处找到有关该功能的更多详细信息

于 2021-05-26T23:24:25.547 回答
0

感觉就像您需要实现自己的流阅读器。您必须逐个读取字节并在对象定义完成后立即停止。这确实是相当低级的。因此,您不会将整个文件加载到 RAM 中,而是采取您正在处理的部分。这似乎是一个答案?

于 2019-10-29T15:40:56.877 回答
0

可以在 .NET 5 (C# 9)ChannelReader中使用System.IO.Pipelines扩展包,而不是使用多个任务,如下所示:System.Text.Json.JsonSerializer

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;

class Program
{
    static readonly byte[] NewLineChars = {(byte)'\r', (byte)'\n'};
    static readonly byte[] WhiteSpaceChars = {(byte)'\r', (byte)'\n', (byte)' ', (byte)'\t'};

    private static async Task Main()
    {
        JsonSerializerOptions jsonOptions = new(JsonSerializerDefaults.Web);
        var json = "{\"some\":\"thing1\"}\r\n{\"some\":\"thing2\"}\r\n{\"some\":\"thing3\"}";
        var contentStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
        var pipeReader = PipeReader.Create(contentStream);
        await foreach (var foo in ReadItemsAsync<Foo>(pipeReader, jsonOptions))
        {
            Console.WriteLine($"foo: {foo.Some}");
        }
    }

    static async IAsyncEnumerable<TValue> ReadItemsAsync<TValue>(PipeReader pipeReader, JsonSerializerOptions jsonOptions = null)
    {
        while (true)
        {
            var result = await pipeReader.ReadAsync();
            var buffer = result.Buffer;
            bool isCompleted = result.IsCompleted;
            SequencePosition bufferPosition = buffer.Start;
            while (true)
            {
                var(value, advanceSequence) = TryReadNextItem<TValue>(buffer, ref bufferPosition, isCompleted, jsonOptions);
                if (value != null)
                {
                    yield return value;
                }

                if (advanceSequence)
                {
                    pipeReader.AdvanceTo(bufferPosition, buffer.End); //advance our position in the pipe
                    break;
                }
            }

            if (isCompleted)
                yield break;
        }
    }

    static (TValue, bool) TryReadNextItem<TValue>(ReadOnlySequence<byte> sequence, ref SequencePosition sequencePosition, bool isCompleted, JsonSerializerOptions jsonOptions)
    {
        var reader = new SequenceReader<byte>(sequence.Slice(sequencePosition));
        while (!reader.End) // loop until we've come to the end or read an item
        {
            if (reader.TryReadToAny(out ReadOnlySpan<byte> itemBytes, NewLineChars, advancePastDelimiter: true))
            {
                sequencePosition = reader.Position;
                if (itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
                {
                    continue;
                }

                return (JsonSerializer.Deserialize<TValue>(itemBytes, jsonOptions), false);
            }
            else if (isCompleted)
            {
                // read last item
                var remainingReader = sequence.Slice(reader.Position);
                using var memoryOwner = MemoryPool<byte>.Shared.Rent((int)reader.Remaining);
                remainingReader.CopyTo(memoryOwner.Memory.Span);
                reader.Advance(remainingReader.Length); // advance reader to the end
                sequencePosition = reader.Position;
                if (!itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
                {
                    return (JsonSerializer.Deserialize<TValue>(memoryOwner.Memory.Span, jsonOptions), true);
                }
                else
                {
                    return (default, true);
                }
            }
            else
            {
                // no more items in sequence
                break;
            }
        }

        // PipeReader needs to read more
        return (default, true);
    }
}

public class Foo
{
    public string Some
    {
        get;
        set;
    }
}

https://dotnetfiddle.net/6j3KGg运行

于 2021-01-07T00:04:35.367 回答
-3

也许你可以使用Newtonsoft.Json序列化程序? https://www.newtonsoft.com/json/help/html/Performance.htm

尤其见部分:

优化内存使用

编辑

您可以尝试反序列化来自 JsonTextReader 的值,例如

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}
于 2019-10-29T14:41:02.360 回答