2

我有我的 c# 表单运行两个线程,一个线程侦听数据进入,另一个正在处理数据,以便我可以使用它。由于某种原因,一旦进程线程启动,监听线程就不再执行。

Thread th1 = new Thread(new ThreadStart(zeroMQConn.Listen));
th1.Start();
Thread th2 = new Thread(() => ProcessData(zeroMQConn));
th2.Start();

当我调试它时,它开始 th1 进入它,然后 th2 开始,它永远不会回到 th1 并且我的数据返回 null。

public void Listen()
    {
        while (true)
        {
            try
            {
                byte[] zmqBuffer = new byte[102400];
                int messageLength;
                lockForZMQ.EnterWriteLock();
                messageLength = socket.Receive(zmqBuffer);
                lockForZMQ.ExitWriteLock();
                byte[] message = new byte[messageLength];
                Buffer.BlockCopy(zmqBuffer, 0, message, 0, messageLength);
                PriceBookData priceBook = PriceBookData.CreateBuilder().MergeFrom(message).Build();
                double Type = priceBook.GetPb(0).QuoteType;
                if (Type == 0.0)
                {
                    lockForList.EnterWriteLock();
                    CachedBidBooks = priceBook;
                    lockForList.ExitWriteLock();
                }
                else
                {
                    lockForList.EnterWriteLock();
                    CachedAskBooks = priceBook;
                    lockForList.ExitWriteLock();
                }
            }
            catch (ZmqException ex)
            {
                MessageBox.Show(ex.Message);
            }
        }
    }

 public void ProcessData(object connection)
    {
        while (true)
        {
            priceBookData = ((ZeroMQClass)connection).GetPriceBook();
        }

    }

 public List<PriceBookData> GetPriceBook()
    {
        List<PriceBookData> AskAndBid = new List<PriceBookData>();
        lockForList.EnterWriteLock();
        if (CachedAskBooks != null && CachedBidBooks != null)
        {
            AskAndBid.Add(CachedBidBooks);
            AskAndBid.Add(CachedAskBooks);
            CachedBidBooks = null;
            CachedAskBooks = null;
            lockForList.ExitWriteLock();
            return AskAndBid;
        }
        lockForList.ExitWriteLock();
        return null;
    }
4

1 回答 1

4

您在这里拥有的是生产者-消费者模型,但您没有正确同步它们。问题在于,您拥有一个变量,并且您完全同步了对该变量的访问,而不是某种类型的缓冲区或准备处理的数据集合。这意味着生产者永远不能在消费者工作时工作。

每当处理生产者/消费者队列时,这BlockingCollection<T>是一个很棒的类。

var queue = new BlockingCollection<PriceBookData>();

Task.Factory.StartNew(() =>
{
    while (true)
    {
        byte[] zmqBuffer = new byte[102400];
        int messageLength;
        socket.Receive(zmqBuffer);
        byte[] message = new byte[messageLength];
        Buffer.BlockCopy(zmqBuffer, 0, message, 0, messageLength);
        PriceBookData priceBook = PriceBookData.CreateBuilder().MergeFrom(message).Build();
        double Type = priceBook.GetPb(0).QuoteType;
        queue.Add(priceBook);
    }
}, TaskCreationOptions.LongRunning);

Task.Factory.StartNew(() =>
{
    foreach (var item in queue.GetConsumingEnumerable())
    {
        //do stuff with item
    }
}, TaskCreationOptions.LongRunning);
于 2013-03-18T16:35:08.573 回答