1

假设我有一个任务队列,每个任务都有一个控制共享资源访问的锁定对象(syncObject),队列可以有多个共享相同同步对象实例的任务。而且我有 N 个并发线程应该使任务出队并按队列顺序处理它们,这意味着按队列顺序获取同步对象的锁定。

代码说明:

abstract class Task
{
    public readonly Object SyncObject = new Object();
}

Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()
{
    Task task;
    lock(queueLock) task = taskQueue.Dequeue();
    //Between this lines is my problem,
    //Other thread can dequeue next task and it may share same syncObject and
    //acquire lock on it before this thread, thought this task was first in queue
    lock(task.SyncObject)
    {
        //Do some work here
    }
}

如何开始处理按队列中的顺序共享相同 SyncObject 的任务(获取 Task.SyncObject 锁)。

4

3 回答 3

2

这种方法怎么样:

  • 使用列表而不是队列
  • 让每个工作线程按顺序循环通过队列,直到找到“解锁”任务

类似(未经测试):

abstract class Task
{
    public readonly Object SyncObject = new Object();
}

List<Task> taskList = new List<Task>();

void TakeItemThreadedMethod()
{
    Task task = null;
    bool found = false;

    try
    {
        // loop until found an task whose SyncObject is free
        while (!found)
        {
            lock (taskList)
            {
                for (int i = 0; i < taskList.Count; i++)
                {
                    object syncObj = taskList[i].SyncObject;
                    if (found = Monitor.TryEnter(syncObj))
                    {
                        for (int x = 0; x < taskList.Count; x++)
                        {
                            if (Object.ReferenceEquals(
                                    syncObj, taskList[x].SyncObject))
                            {
                                task = taskList[x];
                                taskList.RemoveAt(x);
                                break;
                            }
                        }
                        break;
                    }
                }
            }
        }

        // process the task...
        DoWork(task);
    }
    finally
    {
        if (found) Monitor.Exit(task.SyncObject);
    }
}

void QueueTask(Task task)
{
    lock (taskList)
    {
        taskList.Add(task);
    }
}
于 2012-11-01T08:37:43.213 回答
2

听起来您的队列可能不应该包含单个任务-而是任务队列,其中每个子队列都是“共享同步锁的所有任务”。

因此,您的处理器将:

  • 从主队列中取出一个子队列
  • 从子队列中取出第一个任务并处理它
  • 完成后,将子队列放回主队列的末尾(或任何地方,实际上 - 确定您希望调度如何工作)

这将确保每个子队列一次只执行一个任务。

您可能需要从锁到子队列的映射,以便任何创建工作都可以将其添加到正确的子队列中。假设您完全需要该功能,您需要自动计算何时从地图中删除子队列(而不是将其放回主队列)。

编辑:作为对上述内容的优化,您可以将子队列本身放入您用作共享同步锁的任何内容中。它可以引用“一次执行的单个任务”“任务队列”——只是懒惰地创建队列。然后,您将同步锁(实际上不再需要用作锁)放在队列中,每个消费者只会要求它执行下一个任务。如果只有一个任务可用,则返回它(并将“下一个任务”变量设置为 null)。如果有多个任务可用,则将第一个任务出列。

生产者添加新任务时,如果“第一个任务”变量之前为空,则将其设置为要执行的任务,或者如果没有队列但已经是任务,则创建队列,或者队列是如果已经存在,则添加。这解决了不必要的队列创建的低效率。

同样,棘手的部分将是解决如何以原子方式丢弃共享资源锁 - 因为您只想在处理最后一项这样做,但同样您不想错过任务,因为您碰巧将它添加到错误的时间。它不应该太糟糕,但同样你需要仔细考虑它。

于 2012-11-01T07:15:47.840 回答
1

我使用了 Matthew Brindley 建议的 QueuedLock 类,稍作修改后,我将 Enter 函数拆分为 TakeTicket 和 Enter 哪些块。

现在我可以在共享 QueueLock 中使用 TakeTicket 而不会阻塞整个队列。

修改后的代码:

abstract class Task
{
    public readonly QueuedLock SyncObject = new QueuedLock();
}

Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()
{
    Task task;
    int ticket;
    lock(queueLock) 
    {
        task = taskQueue.Dequeue();
        ticket = task.SyncObject.TakeTicket();
    }
    task.SyncObject.Enter(ticket);
    //Do some work here
    task.SyncObject.Exit();
}
于 2012-11-01T08:28:52.090 回答