6

我创建了一个类,其目的是抽象出对队列的并发访问控制。

该类被设计为在单个线程上实例化,由多个线程写入,然后从后续的单个线程中读取。

我在类中生成了一个长时间运行的任务,如果项目成功出列,它将执行阻塞循环并触发事件。

我的问题是:我是否执行取消长时间运行的任务并随后清理/重置CancellationTokenSource对象的正确使用?

理想情况下,我希望一个活动对象能够被停止和重新启动,同时保持可用性以添加到队列中。

我以 Peter Bromberg 的文章为基础:Producer/Consumer Queue and BlockingCollection in C# 4.0

下面的代码:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    public delegate void DeliverNextQueuedItemHandler<T>(T item);

public sealed class SOQueueManagerT<T> 
{

    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning { get; private set; }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public CancellationTokenSource CancellationTokenSource
    {
        get
        {
            if (_canceller == null)
                _canceller = new CancellationTokenSource();

            return _canceller;
        }
    }

    public SOQueueManagerT()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);

        IsRunning = false;
    }

    public void Start()
    {
        if (_listener == null)
        {


            IsRunning = true;

            _listener = Task.Factory.StartNew(() =>
            {

                while (!CancellationTokenSource.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {

                            OnNextItem(item);
                        }
                    }

                }
            },
            CancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            CancellationTokenSource.Cancel();
            CleanUp();
        }
    }

    public void Add(T item)
    {
        _queue.Add(item);
    }

    private void CleanUp()
    {
        _listener.Wait(2000);
        if (_listener.IsCompleted)
        {
            IsRunning = false;
            _listener = null;
            _canceller = null;
        }
    }


 }
}

更新 这就是我最后所做的。它并不完美,但到目前为止正在完成这项工作。

public sealed class TaskQueueManager<T> 
{
    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning
    {
        get
        {
            if (_listener == null)
                return false;
            else if (_listener.Status == TaskStatus.Running ||
                _listener.Status == TaskStatus.Created ||
                _listener.Status == TaskStatus.WaitingForActivation ||
                _listener.Status == TaskStatus.WaitingToRun ||
                _listener.IsCanceled)
                return true;
            else
                return false;
        }
    }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public TaskQueueManager()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);
    }

    public void Start()
    {
        if (_listener == null)
        {
            _canceller = new CancellationTokenSource();

            _listener = Task.Factory.StartNew(() =>
            {
                while (!_canceller.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {
                            try
                            {
                                OnNextItem(item);
                            }
                            catch (Exception e)
                            {
                                //log or call an event
                            }
                        }
                    }
                }
            },
            _canceller.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            _canceller.Cancel();

            if (_listener.IsCanceled && !_listener.IsCompleted)
                _listener.Wait();

            _listener = null;
            _canceller = null;
        }
    }

    public void Add(T item)
    {
        if (item != null)
        {
            _queue.Add(item);
        }
        else
        {
            throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
        }
    }
}
4

1 回答 1

1

仔细的编程是唯一可以削减它的东西。即使您取消该操作,您也可能有一个未在很长一段时间内完成的待处理操作。这很可能是一个死锁的阻塞操作。在这种情况下,您的程序实际上不会终止。

例如,如果我多次调用您的 CleanUp 方法,或者没有先调用 Start,我会感觉它会崩溃。

清理期间的 2 秒超时,感觉比计划的更随意,我实际上会尽可能确保事情正确关闭或崩溃/挂起(你永远不想让并发的东西处于未知状态)。

此外,IsRunning是显式设置的,而不是从对象的状态中推断出来的。

为了获得灵感,我想让你看看我最近写的一个类似的类,它是一个生产者/消费者模式,它在后台线程中工作。您可以在CodePlex上找到该源代码。不过,这是为了解决一个非常具体的问题而设计的。

在这里,取消是通过对只有工作线程识别并因此开始关闭的特定类型进行排队来解决的。这也确保我永远不会取消待处理的工作,只考虑整个工作单元。

为了稍微改善这种情况,您可以为当前工作设置一个单独的计时器,如果它被取消,则中止或回滚未完成的工作。现在,实现类似事务的行为将需要一些试验和错误,因为您需要查看每一个可能的极端情况并问自己,如果程序在这里崩溃会发生什么?理想情况下,所有这些代码路径都会导致您可以恢复工作的可恢复或已知状态。但我想你已经猜到了,这需要仔细的编程和大量的测试。

于 2011-03-10T08:18:09.797 回答