0

在我的应用程序中,我有几个线程用于处理 TCP 连接(一个用于读取,一个用于发送,一个用于处理新的传入连接)。每个线程为所有客户端处理给定类型的操作,因此假设它向TcpClient不同 IP 上的 5 个实例发送数据。我使用 aBlockingCollection作为缓冲区,因为我从发送线程访问它,但也从另一个生成要发送的数据的线程访问它。我在发送线程中运行的函数如下所示:

    private void Sender()
    {
        while (run)
        {
            List<object[]> toRemove = new List<object[]>(); //bypass the fact that I cant remove a foreach iteration item from the BlockingCollection
            foreach (object[] obj in sendBuffer.ToList())
            {
                string IP = obj[0].ToString();
                string msg = obj[1].ToString();
                byte[] data = Encoding.UTF8.GetBytes(msg);
                foreach (TcpClient tcpc in TcpClients)
                {
                    if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP)
                    {
                        NetworkStream stream = tcpc.GetStream();
                        stream.Write(data, 0, data.Length);
                        break;
                    }
                }
                toRemove.Add(obj);
            }
            for (int i = 0; i < toRemove.Count; i++) //the bypass mentioned above
            {
                object[] rmv = toRemove[i];
                sendBuffer.TryTake(out rmv);
            }
        }
    }

注意:BlockingCollection使用的是 type <object[]>。我的问题是,在某个流量点,缓冲区开始填满。我在缓冲区中设置了最多 500 条消息的限制,它很容易溢出。现在,如果我理解正确(不确定),TryTake它会尝试删除该项目,如果此时正在使用该集合,它会等待并重试。(注意:我也尝试将超时设置为 50 毫秒)。如果这是真的(如果不是,请有人纠正我并提出不同的原因),问题可能是集合在大多数时间TryTake被调用时都很忙。会是这样吗?如果是,如何解决?

至于集合的使用,通过生成数据的线程,在迭代 1-80 个项目的 foreach 中,大约每 2 秒访问一次集合。缓冲区开始出现大约 20 多个项目的问题,直到那时,它都很好。发件人线程现在只发送给一个客户端,以后会达到 15 个。所以在高峰期,这将是 80 个项目 x 15 个用户 = 大约每 2 秒 1200 次访问。任何建议都非常感谢,谢谢。

4

1 回答 1

1

TryTake 不像您所描述的那样运行,默认 BlockingCollection 使用 ConcurrentQueue 来存储项目,并且 TryTake 会将队列中的下一个项目分配给提供的 out 引用。

例如

BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>();

object[] message = new object[2];
object[] message2 = new object[2];
// Add messages to the queue
sendBuffer.Add(message);
sendBuffer.Add(message2);

object[] toSend;
// Take next message off the queue
sendBuffer.TryTake(out toSend);

// toSend === message

在您的情况下,您可以使用 BlockingCollection.Take() 等待消息发送:

BlockingCollection<object[]> sendBuffer = new BlockingCollection<object[]>();
// CancellationTokenSource is used in place of your run variable.
System.Threading.CancellationTokenSource cancellationSource 
= new System.Threading.CancellationTokenSource();

    private void Sender()
    {
        // Exit loop if cancellation requested
        while (!cancellationSource.Token.IsCancellationRequested)
        {

            object[] obj;

            try {
                // Blocks until an item is available in sendBuffer or cancellationSource.Cancel() is called.
                obj = sendBuffer.Take(cancellationSource.Token);
            } catch (OperationCanceledException) {
                // OperationCanceledException will be thrown if cancellationSource.Cancel() 
                // is called during call to sendBuffer.Take
                break;
            } catch (InvalidOperationException) {
                // An InvalidOperationException means that Take() was called on a completed collection.
                // See BlockingCollection<T>.CompleteAdding
                break;
            }

            string IP = obj[0].ToString();
            string msg = obj[1].ToString();
            byte[] data = Encoding.UTF8.GetBytes(msg);
            foreach (TcpClient tcpc in TcpClients) {
                if ((tcpc.Client.RemoteEndPoint as IPEndPoint).Address.ToString() == IP) {
                    NetworkStream stream = tcpc.GetStream();
                    stream.Write(data, 0, data.Length);
                    break;
                }
            }               
        }
    } 
于 2014-11-25T00:59:23.383 回答