4

努力寻找遇到类似问题或类似问题的人。

reader.ReadLine()我目前正在使用具有 GZip 要求的 http (json) 流,并且从发送数据到读取数据时遇到延迟。有人建议我这可能与将数据保存在缓冲区中的解码有关?

这就是我目前所拥有的,除了延迟之外它工作得很好。

HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
request.Method = "GET";

request.PreAuthenticate = true;
request.Credentials = new NetworkCredential(username, password);

request.AutomaticDecompression = DecompressionMethods.GZip;
request.ContentType = "application/json";
request.Accept = "application/json";
request.Timeout = 30;
request.BeginGetResponse(AsyncCallback, request);

然后在 AsyncCallback 方法中我有:

HttpWebRequest request = result.AsyncState as HttpWebRequest;

using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8))
{

    while (!reader.EndOfStream)
    {
        string line = reader.ReadLine();
        if (string.IsNullOrWhiteSpace(line)) continue;

        Console.WriteLine(line);
    }
}

它会一直等待,reader.Readline()直到收到更多数据,然后甚至会保留其中的一些数据。还收到了保持活动的换行符,当它决定阅读某些内容时,这些通常会被一次性读出。

我已经在运行 curl 命令的情况下测试了并行运行的流,curl 命令可以很好地接收和解压缩数据。

任何见解都会很棒。谢谢,

编辑 在流式阅读器上使用缓冲区大小没有运气。

new StreamReader(stream, Encoding.UTF8, true, 1)

编辑 也没有运气更新到 .NET 4.5 并使用

request.AllowReadStreamBuffering = false;
4

3 回答 3

5

更新:这似乎在很长一段时间内会出现问题,并且体积率较高,并且应该仅用于缓冲区影响应用程序功能的小体积。从那以后,我切换回了StreamReader.

所以这就是我最终想出的。这有效,没有延迟。这不会被自动 GZip 解压缩缓冲。

using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (MemoryStream memory = new MemoryStream())
using (GZipStream gzip = new GZipStream(memory, CompressionMode.Decompress))
{
    byte[] compressedBuffer = new byte[8192];
    byte[] uncompressedBuffer = new byte[8192];
    List<byte> output = new List<byte>();

    while (stream.CanRead)
    {
        int readCount = stream.Read(compressedBuffer, 0, compressedBuffer.Length);

        memory.Write(compressedBuffer.Take(readCount).ToArray(), 0, readCount);
        memory.Position = 0;

        int uncompressedLength = gzip.Read(uncompressedBuffer, 0, uncompressedBuffer.Length);

        output.AddRange(uncompressedBuffer.Take(uncompressedLength));

        if (!output.Contains(0x0A)) continue;

        byte[] bytesToDecode = output.Take(output.LastIndexOf(0x0A) + 1).ToArray();
        string outputString = Encoding.UTF8.GetString(bytesToDecode);
        output.RemoveRange(0, bytesToDecode.Length);

        string[] lines = outputString.Split(new[] { Environment.NewLine }, new StringSplitOptions());
        for (int i = 0; i < (lines.Length - 1); i++)
        {
            Console.WriteLine(lines[i]);
        }

        memory.SetLength(0);
    }
}
于 2013-02-08T17:31:49.943 回答
1

延迟的 ACK C.Evenhuis 讨论的内容可能有些问题,但我有一种奇怪的直觉,它StreamReader让你头疼……你可以尝试这样的事情:

public void AsyncCallback(IAsyncResult result)
{
    HttpWebRequest request = result.AsyncState as HttpWebRequest;   
    using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
    using (Stream stream = response.GetResponseStream())
    {
        var buffer = new byte[2048];
        while(stream.CanRead)
        {
            var readCount = stream.Read(buffer, 0, buffer.Length);
            var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
            Console.WriteLine(line);
        }
    }
}

编辑:这是我用来测试这个理论的完整工具(也许与你的情况不同会突然出现)

(LINQPad 就绪)

void Main()
{
    Task.Factory.StartNew(() => Listener());
    _blocker.WaitOne();
    Request();
}

public bool _running;
public ManualResetEvent _blocker = new ManualResetEvent(false);

public void Listener()
{
    var listener = new HttpListener();
    listener.Prefixes.Add("http://localhost:8080/");
    listener.Start();
    "Listener is listening...".Dump();;
    _running = true;
    _blocker.Set();
    var ctx = listener.GetContext();
    "Listener got context".Dump();
    ctx.Response.KeepAlive = true;
    ctx.Response.ContentType = "application/json";
    var outputStream = ctx.Response.OutputStream;
    using(var zipStream = new GZipStream(outputStream, CompressionMode.Compress))
    using(var writer = new StreamWriter(outputStream))
    {
        var lineCount = 0;
        while(_running && lineCount++ < 10)
        {
            writer.WriteLine("{ \"foo\": \"bar\"}");
            "Listener wrote line, taking a nap...".Dump();
            writer.Flush();
            Thread.Sleep(1000);
        }
    }
    listener.Stop();
}

public void Request()
{
    var endPoint = "http://localhost:8080";
    var username = "";
    var password = "";
    HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
    request.Method = "GET";

    request.PreAuthenticate = true;
    request.Credentials = new NetworkCredential(username, password);

    request.AutomaticDecompression = DecompressionMethods.GZip;
    request.ContentType = "application/json";
    request.Accept = "application/json";
    request.Timeout = 30;
    request.BeginGetResponse(AsyncCallback, request);
}

public void AsyncCallback(IAsyncResult result)
{
    Console.WriteLine("In AsyncCallback");    
    HttpWebRequest request = result.AsyncState as HttpWebRequest;    
    using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
    using (Stream stream = response.GetResponseStream())
    {
        while(stream.CanRead)
        {
            var buffer = new byte[2048];
            var readCount = stream.Read(buffer, 0, buffer.Length);
            var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
            Console.WriteLine("Reader got:" + line);
        }
    }
}

输出:

Listener is listening...
Listener got context
Listener wrote line, taking a nap...
In AsyncCallback
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}

Listener wrote line, taking a nap...
Reader got:{ "foo": "bar"}
于 2013-02-07T21:08:11.920 回答
0

这可能与延迟 ACK结合 Nagle 算法有关。当服务器连续发送多个小响应时会发生这种情况。

在服务器端,发送第一个响应,但只有在服务器收到来自客户端的 ACK 后,或者直到有足够的数据可以发送大数据包(Nagle 算法)时,才会发送后续响应数据块。

在客户端,收到响应的第一位,但不会立即发送 ACK - 由于传统应用程序具有请求-响应-请求-响应行为,它假设它可以将 ACK 与下一个请求一起发送 - 在你的情况不会发生。

在一段固定的时间(500 毫秒?)之后,它决定发送 ACK,导致服务器发送它迄今为止积累的下一个包。

问题(如果这确实是您遇到的问题)可以通过设置NoDelay属性在服务器端在套接字级别修复,禁用 Nagle 算法。我认为您也可以在整个操作系统范围内禁用它。

您还可以在客户端暂时禁用延迟 ACK(我知道 windows 有一个注册表项)以查看这是否确实是问题所在,而无需更改服务器上的任何内容。延迟 ACK 可防止 DDOS 攻击,因此请确保事后恢复设置。

不那么频繁地发送 keepalive 也可能会有所帮助,但您仍然有可能出现问题。

于 2013-02-07T20:53:14.823 回答