2

在具有多个“请求线程”和一个哑“工作线程”的情况下,请求线程必须排队。

考虑两种可能性:

  1. 每个请求线程在其自己的专用对象上调用 Monitor.Wait,该对象进入 FIFO 队列。当结果到达时,最旧的对象被脉冲。

  2. 所有请求线程都获取一个数字并在共享对象上调用 Monitor.Wait。当结果到达时,在共享对象上调用 Monitor.PulseAll,并且所有请求线程检查它们的数量是否已达到。

可能还有其他选项,但出于此问题的目的,请忽略它们。

问题 -当有很多排队线程时:

  • 这两种方法的 CPU 效率显着提高吗?
  • 任何一种方法都显着提高内存效率吗?

锁定对象只是“新对象()”实例。

我的直觉是场景 1 更有效,因为只有一个线程会在脉冲发生时采取行动,并且基础对象实例非常节省资源(对吗?)。但是,我不太了解 Wait 的机制。如果正在“监视”更多对象,也许需要更多资源?

提前感谢您的见解。


我在下面编写了代码来说明这两种情况。

进一步说明:

在我的情况下,“工人”线程接受工作并异步产生结果。它不知道请求结果属于哪个请求,除了结果总是按照接收请求的顺序产生。

虽然我确实有这个申请,但这个问题应该被视为学术问题。请不要浪费时间质疑基本假设或提出替代解决方案。但是,欢迎提出问题以澄清问题的意图。

using System;
using System.Collections.Generic;
using System.Threading;

namespace praccmd.threads
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            TestResets();

            Console.WriteLine("press key");
            Console.ReadKey();
        }


        private static void TestResets()
        {
            //lock object per work request
            Console.WriteLine("----lock object per work request----");

            for (int i = 1; i <= 10; i++)
            {
                Thread t = new Thread(ThreadLockObjPerRequest);
                t.Name = "Thread_object_per_request_" + i;
                t.Start();
            }

            //now pretend to be the WorkDone event
            while (_ticketQueue.Count > 0)
            {
                Thread.Sleep(50);
                lock (_receiveLock)
                {
                    var doneTicketNext = _ticketQueue.Dequeue();
                    lock (doneTicketNext)
                    {
                        Monitor.Pulse(doneTicketNext);
                        Monitor.Wait(doneTicketNext);
                    }
                }
            }

            //shared lock object (pulseall), one id per request
            Console.WriteLine("----shared lock object----");

            for (int i = 1; i <= 10; i++)
            {
                Thread t = new Thread(ThreadSharedLock);
                t.Name = "Thread_shared_lock_object_" + i;
                t.Start();
            }

            //now pretend to be the WorkDone event
            while (_ticketNumberQueue.Count > 0)
            {
                Thread.Sleep(50);
                lock (_sharedReceiveLock)
                {
                    lock (_sharedLock)
                    {
                        _sharedLock.TicketNumber = _ticketNumberQueue.Dequeue();
                        Monitor.PulseAll(_sharedLock);
                    }
                    lock (_sharedThanksLock) Monitor.Wait(_sharedThanksLock);
                }
            }
        }


        //infrastructure for lock-object-per-request
        private static readonly object _sendLock = new object();
        private static readonly object _receiveLock = new object();
        private static readonly Queue<object> _ticketQueue = new Queue<object>();

        private static object TakeATicket()
        {
            var ticket = new object();
            _ticketQueue.Enqueue(ticket);
            return ticket;
        }

        //lock-object-per-request thread
        private static void ThreadLockObjPerRequest()
        {
            var name = Thread.CurrentThread.Name;

            object ticket;
            lock (_sendLock)
            {
                ticket = TakeATicket();
                //RequestWorkNonBlocking("some data specific to this request");
                Console.WriteLine(name + " sends its request.");
            }

            var myResult = string.Empty;
            lock (ticket)
            {
                Monitor.Wait(ticket);
                //myResult = GetResultFromAStaticVariable();
                Console.WriteLine(name + " gets its data.");
                Monitor.Pulse(ticket);
            }

            //do something with myResult
        }


        //infrastructure for shared-lock
        private class SharedLock { public int TicketNumber { get; set; } }

        private static readonly SharedLock _sharedLock = new SharedLock { TicketNumber = 0 };
        private static readonly dynamic _sharedReceiveLock = new object();
        private static readonly dynamic _sharedThanksLock = new object();
        private static readonly object _ticketIncrementLock = new object();
        private static int _ticketNumber = 0;
        private static readonly Queue<int> _ticketNumberQueue = new Queue<int>();

        private static int TakeATicketNumber()
        {
            lock (_ticketIncrementLock)
            {
                _ticketNumberQueue.Enqueue(++_ticketNumber);
                return _ticketNumber;
            }
        }

        //thread for shared-lock
        private static void ThreadSharedLock()
        {
            var name = Thread.CurrentThread.Name;

            int ticketNumber;
            lock (_sendLock)
            {
                ticketNumber = TakeATicketNumber();
                //RequestWorkNonBlocking("some data specific to this request");
                Console.WriteLine(name + " sends its request.");
            }

            var myResult = string.Empty;
            do
            {
                lock (_sharedLock)
                {
                    Monitor.Wait(_sharedLock);
                    if (_sharedLock.TicketNumber == ticketNumber)
                    {
                        myResult = "response"; //GetResultFromAStaticVariable();
                        Console.WriteLine(name + " gets its data.");
                    }
                }
            } while (myResult.Length == 0);

            lock (_sharedThanksLock) Monitor.Pulse(_sharedThanksLock);

            //do something with myResult
        }

    }
}
4

1 回答 1

4

Performance is always tricky, and will depend a lot on your specific context; you would probably have to measure it to get a good answer for that, noting that it probably depends on the number of expected outstanding tasks, etc.

The way I work this multiplexer scenario is using the Task API; a new incoming request creates a TaskCompletionSource<T> is created an enqueued (synchronized) into a queue, i.e. let's say each result (when it arrives later) is an int:

private readonly Queue<TaskCompletionSource<int>> queue
            = new Queue<TaskCompletionSource<int>>();

public Task<int> MakeRequest(...) {
    var source = new TaskCompletionSource<int>();
    lock(queue) {
        queue.Enqueue(source);
    }
    return source.Task;
}

and then the worker, as results come in, can do something like:

private void SetNextResult(int value) {
    TaskCompletionSource<int> source;
    lock(queue) {
        source = queue.Dequeue();
    }
    source.SetResult(value);
}

The nice things about this is that it allows each individual caller to decide how they want to respond to the delayed work:

  • they can use .Wait / .Result to block
  • they can use .ContinueWith to add a callback
  • they can use await to use a state-machine-based continuation
于 2013-06-04T07:23:47.290 回答