我正在使用 C# 通过 TCP 连接读取连续的数据流(ITCH 数据,即外汇价格),但在运行应用程序较长时间后,有时应用程序会丢弃数据包并且信息会丢失。
下面是我用来读取数据的代码片段:
private void ReaderThreadStarter()
{
StreamReader streamReader = new StreamReader(this._networkStream);
while (!_stopping)
{
try
{
if (this._networkStream.DataAvailable)
{
while ((line = streamReader.ReadLine()) != null)
{
lock (_queue.ConcurrentQueue)
{
byte[] data = System.Text.Encoding.ASCII.GetBytes(line);
Log.Info("Data Added in Queue: " + Encoding.ASCII.GetString(data, 0, data.Length));
_queue.WriteToQueue(data);
}
}
}
}
catch (Exception exception)
{
Log.Error(exception);
}
finally
{
SetStopped();
}
}
}
上面这段代码所做的是,它从 TCP 连接中读取数据并将其写入并发队列,然后另一个线程使用队列中的数据进行处理。所以基本上是一个简单的生产者-消费者问题。
生产者-消费者部分似乎工作正常,因为我在队列中写入的内容被消费者使用。
一种选择是使用嗅探器并确认应用程序正在丢弃数据包,但我在无法使用嗅探器的环境中工作。我相信有丢包的原因是因为对于我的一些外汇订单我从来没有被取消并且我的价格下跌并且数据提供商告诉我定价在那里是正确的。
我还在记录保存在队列中之前从 TCP 端口读取的数据,因此从日志中我假设数据在从连接中读取时丢失了。
有人可以告诉我我在这里可能做错了什么或丢弃数据包的原因是什么。
以下是我的消费者代码的代码片段:
public void ReadQueue()
{
try
{
while (true)
{
{
byte[] data = _queue.ReadFromQueue();
Parse(data);
}
}
}
catch (Exception exception)
{
Log.Error(exception);
}
}
public byte[] ReadFromQueue()
{
try
{
byte[] data;
lock (this) // Enter synchronization block
{
ConcurrentQueue.TryDequeue(out data);
}
return data;
}
catch (Exception exception)
{
Log.Error(exception);
return null;
}
}