0

我正在创建一个控制台应用程序,它应该从网络接收消息以便处理它们。首先我创建了一个单例类来确保所有类都可以访问同一个队列:这个类是ProcessingQueue.

public class ProcessingQueue
{
    public class ItemToProcess
    {
        public string SourceClientId { get; set; }
        public IMessage ReceivedMessage { get; set; }
    }

    private int m_MaxSize = 20;
    private Queue<ItemToProcess> m_InternalQueue;

    private static volatile ProcessingQueue m_Instance = null;
    private static readonly object syncRoot = new object();

    private ProcessingQueue()
    {
        m_InternalQueue = new Queue<ItemToProcess>();
    }

    public static ProcessingQueue Instance
    {
        get
        {
            if (m_Instance == null)
            {
                lock (syncRoot)
                {
                    if (m_Instance == null)
                    {
                        m_Instance = new ProcessingQueue();
                    }
                }
            }
            return m_Instance;
        }
    }

    public int MaxSize
    {
        get
        {
            lock (syncRoot)
            {
                return m_MaxSize;
            }
        }
        set
        {
            if (value > 0)
            {
                lock (syncRoot)
                {
                    m_MaxSize = value;
                }
            }
        }
    }

    public void Enqueue(String source, IMessage message)
    {
        lock (syncRoot)
        {
            while (m_InternalQueue.Count >= m_MaxSize)
            {
                Monitor.Wait(syncRoot);
            }
            m_InternalQueue.Enqueue(new ItemToProcess { SourceClientId = source, ReceivedMessage = message });
            if (m_InternalQueue.Count == 1)
            {
                Monitor.PulseAll(syncRoot);
            }
        }
    }

    public ItemToProcess Dequeue()
    {
        lock (syncRoot)
        {
            while (m_InternalQueue.Count == 0)
            {
                Monitor.Wait(syncRoot);
            }
            ItemToProcess item = m_InternalQueue.Dequeue();
            if (m_InternalQueue.Count == m_MaxSize - 1)
            {
                Monitor.PulseAll(syncRoot);
            }
            return item;
        }
    }

    public int Count
    {
        get
        {
            lock (syncRoot)
            {
                return m_InternalQueue.Count;
            }
        }
    }
}

然后我实现了项目的主类如下。

  1. 首先,共享队列被实例化。
  2. 然后我设置了一个计时器来模拟保活消息(第一个生产者)的到达。
  3. 然后我创建了消费者线程(processing对象)。
  4. 然后我创建了另一个生产者线程(generating对象)。
  5. 最后,我运行所有线程和计时器。

    类程序{静态处理队列队列=处理队列.实例;静态 System.Timers.Timer keep_alive_timer = new System.Timers.Timer(10000);

    private static volatile bool running = true;
    
    
    static void Main(string[] args)
    {
        queue.MaxSize = 30;
        keep_alive_timer.Elapsed += new ElapsedEventHandler(delegate(object sender, ElapsedEventArgs e)
        {
            KeepAliveMessage msg = new KeepAliveMessage(Guid.NewGuid());
            Console.WriteLine("Keep Alive: " + msg.MsgId);
            queue.Enqueue("", msg);
        });
        keep_alive_timer.Enabled = true;
        keep_alive_timer.AutoReset = true;
    
        Thread processing = new Thread(delegate()
        {
            while (running)
            {
                Console.WriteLine("Number of elements in queue: {0}", queue.Count);
    
                ProcessingQueue.ItemToProcess msg = queue.Dequeue();
                Console.WriteLine("Processed: msgid={0}, source={1};", msg.ReceivedMessage.MsgId, msg.SourceClientId);
    
                Thread.Sleep(1500);
            }
        });
    
        Thread generating = new Thread(MessagesFromNetwork);
    
        processing.Start();
        keep_alive_timer.Start();
        generating.Start();
    
        Console.WriteLine("RUNNING...\n");
        Console.ReadLine();
    
        running = false;
        keep_alive_timer.Stop();
        Console.WriteLine("CLOSING...\n");
    
        //processing.Abort();
        //generating.Abort();
    
        bool b1 = processing.Join(TimeSpan.FromSeconds(5));
        bool b2 = generating.Join(TimeSpan.FromSeconds(5));
    
        Console.WriteLine("b1 {0}", b1);
        Console.WriteLine("b2 {0}", b2);
        Console.WriteLine("END");
        Console.ReadLine();
    }
    
    static void MessagesFromNetwork()
    {
        string[] sourceClients = { "1", "2", "3", "4", "5" };
        while (running)
        {
            IMessage msg; // interface IMessage
            Random random = new Random();
            int type = random.Next(2);
            switch (type)
            {
                case 0:
                    msg = new KeepAliveMessage(Guid.NewGuid());   // implements IMessage
                    break;
                case 1:
                    msg = new TaskMessage(Guid.NewGuid(), ...);   // implements IMessage
                    break;
                default:
                    throw new Exception("Messaggio non valido!");
            }
            Console.WriteLine("New message received: " + msg.MsgId);
            queue.Enqueue(sourceClients[random.Next(sourceClients.Length)], msg);
            Console.WriteLine("... message enqueued: " + msg.MsgId);
            Thread.Sleep(500);
        }
    }
    

    }

在执行过程中按Enterrunning ,变量变为 false,两个线程都应该终止。然而这并不总是发生,事实上这两种方法之一Join没有返回控制:出于这个原因,我在Join方法中指定了一个超时,但是在Console.WriteLine("END");控制台应用程序冻结之后(第二个Join返回false)。

也许第二个线程没有正确终止......为什么?

4

1 回答 1

1

似乎 Dequeue 或 Enqueue 可以进入 a Monitor.Wait(),当运行停止时,没有人脉动。

您等待 5 秒,但请注意 MaxSize * 1500 > 5000

我无法直接找出定时器频率。

于 2012-07-01T22:53:39.513 回答