2

I have a Stream containing a collection of objects serialized into JSON, for example:

{"Name":"Alice","Age":20}{"Name":"Bob","Age":30}{"Name":"Charlie","Age":35}{"Name":"Danielle","Age":50}...

This stream may be very large so I want to process the items as they come in rather than reading the entire stream into a string. Also if the stream is a network stream it be open indefinitely.

Some of the objects may be quite complex so I would like to take advantage of the JSON serialization provided by JSON.NET.

Is there a way I can use Newtonsoft Json to convert the stream into an IObservable<Person> and process the items reactively as they arrive?

4

1 回答 1

4

If you could modify your input so that it looks like a JSON array, then you could use JsonTextReader directly to read the array parts and then use JsonSerializer to read the objects inside the array.

This way, the results can be streamed, while you don't have to deal with all the deserialization.

The method producing an IObservable could look something like:

public static IObservable<T> DeserializeAsObservable<T>(this TextReader reader)
{
    var jsonReader = new JsonTextReader(reader);

    if (!jsonReader.Read() || jsonReader.TokenType != JsonToken.StartArray)
        throw new Exception();

    var serializer = JsonSerializer.Create(null);

    return Observable.Create<T>(o => 
        {
            while (true)
            {
                if (!jsonReader.Read())
                    o.OnError(new Exception());
                if (jsonReader.TokenType == JsonToken.EndArray)
                    break;
                var obj = serializer.Deserialize<T>(jsonReader);
                o.OnNext(obj);
            }
            o.OnCompleted();
            return Disposable.Empty;
        });
}

(Obviously use this only after replacing new Exception() with a more suitable exception type.)

于 2013-03-30T16:13:08.933 回答