2

我正在构建一个通用的 URI 检索系统。本质上,有一个通用类Retriever<T>,它维护一个要检索的 URI 队列。它有一个单独的线程,可以尽可能快地处理该队列。如问题标题所示,URI 类型的一个示例是 HTTP 类型 URI。

问题是,当我开始通过抽象方法请求检索资源时,T RetrieveResource(Uri location)由于缺乏异步性,它会变慢。

更改 to 的返回类型RetrieveResourceTask<T>我的第一个想法。但是,当我们有数千个未完成的任务时,这似乎会使任务堆积起来并导致很多问题。它似乎创建了许多实际线程,而不是利用线程池。我想这只会减慢一切,因为一次发生的事情太多了,所以没有任何单独的事情取得重大进展。

预计我们将有大量排队的项目要检索,并且它们的处理速度不能像排队一样快。随着时间的推移,系统有机会赶上;但这绝对不会很快。

我还考虑过而不是维护一个队列和一个线程来处理它......只是在ThreadPool. 但是,如果说我需要在处理所有工作项之前关闭系统,或者稍后想要允许优先级排序等,我不确定这是否理想。

我们也知道检索资源是一个耗时的过程(0.250 - 5 秒),但不一定是资源密集型过程。我们可以将其并行化为数百个请求。

我们的要求是:

  • URI 可以从任何线程入队,即使系统正在处理队列
  • 检索可能需要稍后能够被优先处理
  • 检索应该可以暂停
  • 当没有检索到任何东西时应该发生最小的旋转(BlockingCollection在这里很有用)。

有没有一种在不引入不必要的复杂性的情况下并行化的好方法?

下面是我们现有的一些代码,作为示例。

public abstract class Retriever<T> : IRetriever<T>, IDisposable
{
    private readonly Thread worker;
    private readonly BlockingCollection<Uri> pending;
    private volatile int isStarted;
    private volatile int isDisposing;

    public event EventHandler<RetrievalEventArgs<T>> Retrieved;

    protected Retriever()
    {
        this.worker = new Thread(this.RetrieveResources);
        this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
        this.isStarted = 0;
        this.isDisposing = 0;
    }

    ~Retriever()
    {
        this.Dispose(false);
    }

    private void RetrieveResources()
    {
        while (this.isDisposing == 0)
        {
            while (this.isStarted == 0)
            {
                Monitor.Wait(this.pending);
            }

            Uri location = this.pending.Take();

            // This is what needs to be concurrently done.
            // In this example, it's synchronous, but just on a separate thread.
            T result = this.RetrieveResource(location);

            // At this point, we would fire our event with the retrieved data
        }
    }

    protected abstract T RetrieveResource(Uri location);

    protected void Dispose(bool disposing)
    {
        if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
        {
            return;
        }

        if (disposing)
        {
            this.pending.CompleteAdding();
            this.worker.Join();
        }
    }

    public void Add(Uri uri)
    {
        try
        {
            this.pending.Add(uri);
        }
        catch (InvalidOperationException)
        {
            return;
        }
    }

    public void AddRange(IEnumerable<Uri> uris)
    {
        foreach (Uri uri in uris)
        {
            try
            {
                this.pending.Add(uri);
            }
            catch (InvalidOperationException)
            {
                return;
            }
        }
    }

    public void Start()
    {
        if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
        {
            throw new InvalidOperationException("The retriever is already started.");
        }

        if (this.worker.ThreadState == ThreadState.Unstarted)
        {
            this.worker.Start();
        }

        Monitor.Pulse(this.pending);
    }

    public void Stop()
    {
        if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
        {
            throw new InvalidOperationException("The retriever is already stopped.");
        }
    }

    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }
}

以上面的例子为基础......我认为这个解决方案增加了太多的复杂性,或者更确切地说,奇怪的代码......就是这样。

    private void RetrieveResources()
    {
        while (this.isDisposing == 0)
        {
            while (this.isStarted == 0)
            {
                Monitor.Wait(this.pending);
            }

            Uri location = this.pending.Take();

            Task<T> task = new Task<T>((state) =>
                {
                    return this.RetrieveResource(state as Uri);
                }, location);

            task.ContinueWith((t) =>
                {
                    T result = t.Result;
                    RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result);

                    EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved;
                    if (!Object.ReferenceEquals(callback, null))
                    {
                        callback(this, args);
                    }
                });

            task.Start();
        }
    }
4

1 回答 1

2

我想出了一个很好的解决方案。我抽象了检索资源的方法和结果的表示。这允许支持检索具有任意结果的任意 URI;有点像一些 URI 驱动的“ORM”。

它支持可变并发级别。前几天我发布这个问题时,我忘记了异步和并发是完全不同的,我通过任务实现的只是异步并阻塞了任务调度程序,因为我真正想要的是并发。

我添加了取消功能,因为拥有启动/停止功能似乎是个好主意。

public abstract class Retriever<T> : IRetriever<T>
{
    private readonly object locker;
    private readonly BlockingCollection<Uri> pending;
    private readonly Thread[] threads;
    private CancellationTokenSource cancellation;

    private volatile int isStarted;
    private volatile int isDisposing;

    public event EventHandler<RetrieverEventArgs<T>> Retrieved;

    protected Retriever(int concurrency)
    {
        if (concurrency <= 0)
        {
            throw new ArgumentOutOfRangeException("concurrency", "The specified concurrency level must be greater than zero.");
        }

        this.locker = new object();
        this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
        this.threads = new Thread[concurrency];
        this.cancellation = new CancellationTokenSource();

        this.isStarted = 0;
        this.isDisposing = 0;

        this.InitializeThreads();
    }

    ~Retriever()
    {
        this.Dispose(false);
    }

    private void InitializeThreads()
    {
        for (int i = 0; i < this.threads.Length; i++)
        {
            Thread thread = new Thread(this.ProcessQueue)
            {
                IsBackground = true
            };

            this.threads[i] = thread;
        }
    }

    private void StartThreads()
    {
        foreach (Thread thread in this.threads)
        {
            if (thread.ThreadState == ThreadState.Unstarted)
            {
                thread.Start();
            }
        }
    }

    private void CancelOperations(bool reset)
    {
        this.cancellation.Cancel();
        this.cancellation.Dispose();

        if (reset)
        {
            this.cancellation = new CancellationTokenSource();
        }
    }

    private void WaitForThreadsToExit()
    {
        foreach (Thread thread in this.threads)
        {
            thread.Join();
        }
    }

    private void ProcessQueue()
    {
        while (this.isDisposing == 0)
        {
            while (this.isStarted == 0)
            {
                Monitor.Wait(this.locker);
            }

            Uri location;

            try
            {
                location = this.pending.Take(this.cancellation.Token);
            }
            catch (OperationCanceledException)
            {
                continue;
            }

            T data;

            try
            {
                data = this.Retrieve(location, this.cancellation.Token);
            }
            catch (OperationCanceledException)
            {
                continue;
            }

            RetrieverEventArgs<T> args = new RetrieverEventArgs<T>(location, data);

            EventHandler<RetrieverEventArgs<T>> callback = this.Retrieved;
            if (!Object.ReferenceEquals(callback, null))
            {
                callback(this, args);
            }
        }
    }

    private void ThowIfDisposed()
    {
        if (this.isDisposing == 1)
        {
            throw new ObjectDisposedException("Retriever");
        }
    }

    protected abstract T Retrieve(Uri location, CancellationToken token);

    protected virtual void Dispose(bool disposing)
    {
        if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
        {
            return;
        }

        if (disposing)
        {
            this.CancelOperations(false);
            this.WaitForThreadsToExit();
            this.pending.Dispose();
        }
    }

    public void Start()
    {
        this.ThowIfDisposed();

        if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
        {
            throw new InvalidOperationException("The retriever is already started.");
        }

        Monitor.PulseAll(this.locker);
        this.StartThreads();
    }

    public void Add(Uri location)
    {
        this.pending.Add(location);
    }

    public void Stop()
    {
        this.ThowIfDisposed();

        if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
        {
            throw new InvalidOperationException("The retriever is already stopped.");
        }

        this.CancelOperations(true);
    }

    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }
}
于 2012-10-25T07:10:59.160 回答