4

我得到了以下代码(在多线程环境中效果不佳)

public class SomeClass
{
    private readonly ConcurrentQueue<ISocketWriterJob> _writeQueue = new ConcurrentQueue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        if (_currentJob != null)
        {
            _writeQueue.Enqueue(job);
            return;
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();
            if (!_writeQueue.TryDequeue(out _currentJob))
            {
                _currentJob = null;
                return;
            }
        }

        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}

如果当前没有正在执行的作业,则 Send 方法应该异步调用作业。如果有的话,它应该排队工作。

锁定_currentJob分配/检查将使一切正常。但是有没有一种无锁的方法来解决它?

更新

我正在使用套接字,它是SendAsync发送信息的方法。这意味着我不知道Send()调用该方法时是否有写/作业挂起。

4

4 回答 4

4

考虑使用CompareExchange关于预期状态转换的假设。无需使用 ConcurrentQueue,因为现在我们可以控制同步。

更新以使用状态机
再次更新以删除不需要Interlocked.Exchange的(用于状态分配)。

public class SomeClass
{
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;
    private enum State { Idle, Active, Enqueue, Dequeue };
    private State _state;

    public void Send(ISocketWriterJob job)
    {
        bool spin = true;
        while(spin)
        {
            switch(_state)
            {
            case State.Idle:
                if (Interlocked.CompareExchange(ref _state, State.Active, State.Idle) == State.Idle)
                {
                    spin = false;
                }
                // else consider new state
                break;
            case State.Active:
                if (Interlocked.CompareExchange(ref _state, State.Enqueue, State.Active) == State.Active)
                {
                    _writeQueue.Enqueue(job);
                    _state = State.Active;
                    return;
                }
                // else consider new state
                break;
            case State.Enqueue:
            case State.Dequeue:
                // spin to wait for new state
                Thread.Yield();
                break;
            }
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();

            bool spin = true;
            while(spin)
            {
                switch(_state)
                {
                case State.Active:
                    if (Interlocked.CompareExchange(ref _state, State.Dequeue, State.Active) == State.Active)
                    {
                        if (!_writeQueue.TryDequeue(out _currentJob))
                        {
                            // handle in state _currentJob = null;
                            _state = State.Idle;
                            return;
                        }
                        else
                        {
                            _state = State.Active;
                        }
                    }
                    // else consider new state
                    break;

                case State.Enqueue:
                    // spin to wait for new state
                    Thread.Yield();
                    break;

                // impossible states
                case State.Idle:
                case State.Dequeue:
                    break;
                }
            }
        }

        _logger.Debug(_writeArgs.GetHashCode() + ": writing more ");
        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}
于 2012-12-03T12:49:03.140 回答
1

目前,您的生产者和消费者之间的分歧有点模糊;您有“将作业放入队列或立即使用”和“从队列中使用作业,如果没有则退出”;更清楚的是“将作业放入队列”和“从队列中使用作业(最初)”和“从队列中使用作业(一旦作业完成”)。

这里的诀窍是使用 aBlockingCollection这样您就可以等待工作出现:

BlockingCollection<ISocketWriterJob> _writeQueue =
         new BlockingCollection<ISocketWriterJob>();

让线程Send从字面上调用只是排队一个作业:

public void Send(ISocketWriterJob job)
{
    _writeQueue.Add(job);
}

然后有另一个只消耗作业的线程。

public void StartConsumingJobs()
{
    // Get the first job or wait for one to be queued.
    _currentJob = _writeQueue.Take();

    // Start job
}

private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
    if (_currentJob.WriteCompleted(bytesTransferred))
    {
        _currentJob.Dispose();

        // Get next job, or wait for one to be queued.
        _currentJob = _writeQueue.Take();
    }

    _currentJob.Write(_writeArgs);

   // Start/continue job as before
}
于 2012-12-03T13:15:57.477 回答
0

我不认为你会从使用无锁技术中获得什么。即使使用简单的锁,您也可以保持在用户模式,因为Monitor.Enter/Monitor.Exit使用了首先旋转,并且只有当您在等待状态等待更长时间时,它们才会转换到内核模式。

这意味着基于锁的技术将与任何无锁技术一样好,因为您可以锁定仅用于将作业存储到队列中并从中取回,但是您将拥有每个开发人员都可以使用的清晰和健壮的代码理解:

public class SomeClass
{
    // We don't have to use Concurrent collections
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private readonly object _syncRoot = new object();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        lock(_syncRoot)
        {
            if (_currentJob != null)
            {
                _writeQueue.Enqueue(job);
                return;
            }
            _currentJob = job;
        }

        // Use job instead of shared state
        StartJob(job);
    }

    private void StartJob(ISocketWriterJob job)
    {
       job.Write(_writeArgs);
       // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        ISocketWriterJob currentJob = null;

        // error checks etc removed for this sample.
        lock(_syncRoot)
        {
           // I suppose this operation pretty fast as well as Dispose
           if (_currentJob.WriteCompleted(bytesTransferred))
            {
               _currentJob.Dispose();
              // There is no TryDequeue method in Queue<T>
              // But we can easily add it using extension method
              if (!_writeQueue.TryDequeue(out _currentJob))
              {
                  // We don't have set _currentJob to null
                  // because we'll achieve it via out parameter
                  // _currentJob = null;
                  return;
              }
           }

           // Storing current job for further work
           currentJob = _currentJob;
        }

        StartJob(currentJob);
    }
}

无锁是一种优化,与任何其他优化一样,您应该首先测量性能,以确保您对简单的基于锁的实现有问题,并且只有当它是真的时 - 使用一些较低级别的技术,如无锁。性能和可维护性是一个经典的权衡,您应该非常谨慎地选择。

于 2012-12-03T18:26:18.680 回答
-2

您可以将当前作业标记为volatile应确保当前线程获得最新状态的作业。不过一般来说,锁定是有利的。

private volatile ISocketWriterJob _currentJob;
于 2012-12-03T12:18:44.727 回答