2

我见过很多好的对象池实现。例如:C# 对象池模式实现

但似乎线程安全的总是使用锁,从不尝试使用 Interlocked.* 操作。

编写一个不允许将对象返回到池中的方法似乎很容易(只是一个带有 Interlocked.Increments 指针的大数组)。但是我想不出任何方法来编写一个让您返回对象的方法。有人做过吗?

4

6 回答 6

2

仔细想想为什么你需要对象池——这里没有讨论池化的对象。对于大多数对象,使用托管堆将提供必要的功能,而无需在您自己的代码中使用新的池管理器。只有当您的对象封装了难以建立或难以释放的资源时,托管代码中的对象池才值得考虑。

如果您确实需要自己做,那么有一个轻量级的读取器/写入器锁可能有助于优化池访问。

http://theburningmonk.com/2010/02/threading-using-readerwriterlockslim/

于 2010-09-07T22:13:05.680 回答
2

我已经使用构建为单链表的无锁队列来完成它。以下内容删除了一些不相关的内容,我没有在删除这些内容的情况下对其进行测试,但至少应该给出这个想法。

internal sealed class LockFreeQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next;
    public Node(T item)
    {
      Item = item;
    }
  }
  private volatile Node _head;
  private volatile Node _tail;
  public LockFreeQueue()
  {
    _head = _tail = new Node(default(T));
  }
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    for(;;)
    {
      Node curTail = _tail;
      if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)   //append to the tail if it is indeed the tail.
      {
        Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.
        return;
      }
      else
      {
        Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
      }
    }
  }    
  public bool TryDequeue(out T item)
  {
    for(;;)
    {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (curHead == curTail)
      {
        if (curHeadNext == null)
        {
          item = default(T);
          return false;
        }
        else
          Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);   // assist obstructing thread
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
        {
          return true;
        }
      }
    }
  }
#pragma warning restore 420
}

如果您使用池的原因是对分配和收集的原始性能考虑,那么分配和收集的事实使它变得毫无用处。如果是因为获取和/或释放底层资源的成本很高,或者因为实例缓存了正在使用的“学习”信息,那么它可能适合。

于 2010-09-08T01:10:52.340 回答
1

您是否查看过 .Net 4 中的并发集合。

例如http://msdn.microsoft.com/en-us/library/dd287191.aspx

于 2010-09-08T02:29:27.327 回答
1

返回引用对象的问题在于它首先破坏了锁定对它的访问的整个尝试。您不能使用基本的 lock() 命令来控制对对象范围之外的资源的访问,这意味着传统的 getter/setter 设计不起作用。

可以工作的是包含可锁定资源的对象,并允许传入将使用资源的 lambda 或委托。该对象将锁定资源,运行委托,然后在委托完成时解锁。这基本上将运行代码的控制权交给了锁定对象,但允许比 Interlocked 更复杂的操作。

另一种可能的方法是公开 getter 和 setter,但使用“checkout”模型实现您自己的访问控制;当允许线程“获取”一个值时,将当前线程的引用保存在锁定的内部资源中。在该线程调用 setter、中止等之前,所有其他尝试访问 getter 的线程都保留在 Yield 循环中。重新签入资源后,下一个线程可以获取它。

public class Library
{
   private Book controlledBook
   private Thread checkoutThread;

   public Book CheckOutTheBook()
   {
      while(Thread.CurrentThread != checkoutThread && checkoutThread.IsAlive)
          thread.CurrentThread.Yield();

      lock(this)
      {
         checkoutThread = Thread.CurrentThread;

         return controlledBook;
      }
   }

   public void CheckInTheBook(Book theBook)
   {
      if(Thread.CurrentThread != checkoutThread)
          throw new InvalidOperationException("This thread does not have the resource checked out.");

      lock(this)
      {
         checkoutThread = null;

         controlledBook = theBook;
      }
   }

}

现在,请注意,这仍然需要对象用户之间的一些合作。特别是,这种逻辑对于 setter 来说是相当幼稚的;没有签出就不可能签入一本书。这条规则对消费者来说可能并不明显,不当使用可能会导致未处理的异常。此外,所有用户都必须知道如果他们将在终止之前停止使用该对象,则必须重新签入该对象,即使基本的 C# 知识会规定,如果您获得引用类型,您所做的更改会在任何地方反映出来。但是,这可以用作对非线程安全资源的基本“一次一个”访问控制。

于 2010-09-07T22:16:25.130 回答
1

好问题。在编写高性能软件时,使用快速对象池采用零分配模式是至关重要的。

微软在 Apache License 2.0 下发布了一个对象池

它避免使用锁,并且只使用 Interlocked.CompareExchange 进行分配 (Get)。当您一次获取和释放几个对象时,这似乎特别快,这是大多数用例。如果您获得大量对象,则似乎不太优化,然后释放该批次,因此如果您的应用程序行为如此,您应该修改。

正如您所建议的,我认为 Interlocked.Increment 方法可能更通用,并且更适合批处理用例。

http://sourceroslyn.io/#Microsoft.CodeAnalysis.Workspaces/ObjectPool%25601.cs,98aa6d9b3c4e313b

// Copyright (c) Microsoft.  All Rights Reserved.  Licensed under the Apache License, Version 2.0.  See License.txt in the project root for license information.

// define TRACE_LEAKS to get additional diagnostics that can lead to the leak sources. note: it will
// make everything about 2-3x slower
// 
// #define TRACE_LEAKS

// define DETECT_LEAKS to detect possible leaks
// #if DEBUG
// #define DETECT_LEAKS  //for now always enable DETECT_LEAKS in debug.
// #endif

using System;
using System.Diagnostics;
using System.Threading;

#if DETECT_LEAKS
using System.Runtime.CompilerServices;

#endif
namespace Microsoft.CodeAnalysis.PooledObjects
{
    /// <summary>
    /// Generic implementation of object pooling pattern with predefined pool size limit. The main
    /// purpose is that limited number of frequently used objects can be kept in the pool for
    /// further recycling.
    /// 
    /// Notes: 
    /// 1) it is not the goal to keep all returned objects. Pool is not meant for storage. If there
    ///    is no space in the pool, extra returned objects will be dropped.
    /// 
    /// 2) it is implied that if object was obtained from a pool, the caller will return it back in
    ///    a relatively short time. Keeping checked out objects for long durations is ok, but 
    ///    reduces usefulness of pooling. Just new up your own.
    /// 
    /// Not returning objects to the pool in not detrimental to the pool's work, but is a bad practice. 
    /// Rationale: 
    ///    If there is no intent for reusing the object, do not use pool - just use "new". 
    /// </summary>
    internal class ObjectPool<T> where T : class
    {
        [DebuggerDisplay("{Value,nq}")]
        private struct Element
        {
            internal T Value;
        }

        /// <remarks>
        /// Not using System.Func{T} because this file is linked into the (debugger) Formatter,
        /// which does not have that type (since it compiles against .NET 2.0).
        /// </remarks>
        internal delegate T Factory();

        // Storage for the pool objects. The first item is stored in a dedicated field because we
        // expect to be able to satisfy most requests from it.
        private T _firstItem;
        private readonly Element[] _items;

        // factory is stored for the lifetime of the pool. We will call this only when pool needs to
        // expand. compared to "new T()", Func gives more flexibility to implementers and faster
        // than "new T()".
        private readonly Factory _factory;

#if DETECT_LEAKS
        private static readonly ConditionalWeakTable<T, LeakTracker> leakTrackers = new ConditionalWeakTable<T, LeakTracker>();

        private class LeakTracker : IDisposable
        {
            private volatile bool disposed;

#if TRACE_LEAKS
            internal volatile object Trace = null;
#endif

            public void Dispose()
            {
                disposed = true;
                GC.SuppressFinalize(this);
            }

            private string GetTrace()
            {
#if TRACE_LEAKS
                return Trace == null ? "" : Trace.ToString();
#else
                return "Leak tracing information is disabled. Define TRACE_LEAKS on ObjectPool`1.cs to get more info \n";
#endif
            }

            ~LeakTracker()
            {
                if (!this.disposed && !Environment.HasShutdownStarted)
                {
                    var trace = GetTrace();

                    // If you are seeing this message it means that object has been allocated from the pool 
                    // and has not been returned back. This is not critical, but turns pool into rather 
                    // inefficient kind of "new".
                    Debug.WriteLine($"TRACEOBJECTPOOLLEAKS_BEGIN\nPool detected potential leaking of {typeof(T)}. \n Location of the leak: \n {GetTrace()} TRACEOBJECTPOOLLEAKS_END");
                }
            }
        }
#endif

        internal ObjectPool(Factory factory)
            : this(factory, Environment.ProcessorCount * 2)
        { }

        internal ObjectPool(Factory factory, int size)
        {
            Debug.Assert(size >= 1);
            _factory = factory;
            _items = new Element[size - 1];
        }

        private T CreateInstance()
        {
            var inst = _factory();
            return inst;
        }

        /// <summary>
        /// Produces an instance.
        /// </summary>
        /// <remarks>
        /// Search strategy is a simple linear probing which is chosen for it cache-friendliness.
        /// Note that Free will try to store recycled objects close to the start thus statistically 
        /// reducing how far we will typically search.
        /// </remarks>
        internal T Allocate()
        {
            // PERF: Examine the first element. If that fails, AllocateSlow will look at the remaining elements.
            // Note that the initial read is optimistically not synchronized. That is intentional. 
            // We will interlock only when we have a candidate. in a worst case we may miss some
            // recently returned objects. Not a big deal.
            T inst = _firstItem;
            if (inst == null || inst != Interlocked.CompareExchange(ref _firstItem, null, inst))
            {
                inst = AllocateSlow();
            }

#if DETECT_LEAKS
            var tracker = new LeakTracker();
            leakTrackers.Add(inst, tracker);

#if TRACE_LEAKS
            var frame = CaptureStackTrace();
            tracker.Trace = frame;
#endif
#endif
            return inst;
        }

        private T AllocateSlow()
        {
            var items = _items;

            for (int i = 0; i < items.Length; i++)
            {
                // Note that the initial read is optimistically not synchronized. That is intentional. 
                // We will interlock only when we have a candidate. in a worst case we may miss some
                // recently returned objects. Not a big deal.
                T inst = items[i].Value;
                if (inst != null)
                {
                    if (inst == Interlocked.CompareExchange(ref items[i].Value, null, inst))
                    {
                        return inst;
                    }
                }
            }

            return CreateInstance();
        }

        /// <summary>
        /// Returns objects to the pool.
        /// </summary>
        /// <remarks>
        /// Search strategy is a simple linear probing which is chosen for it cache-friendliness.
        /// Note that Free will try to store recycled objects close to the start thus statistically 
        /// reducing how far we will typically search in Allocate.
        /// </remarks>
        internal void Free(T obj)
        {
            Validate(obj);
            ForgetTrackedObject(obj);

            if (_firstItem == null)
            {
                // Intentionally not using interlocked here. 
                // In a worst case scenario two objects may be stored into same slot.
                // It is very unlikely to happen and will only mean that one of the objects will get collected.
                _firstItem = obj;
            }
            else
            {
                FreeSlow(obj);
            }
        }

        private void FreeSlow(T obj)
        {
            var items = _items;
            for (int i = 0; i < items.Length; i++)
            {
                if (items[i].Value == null)
                {
                    // Intentionally not using interlocked here. 
                    // In a worst case scenario two objects may be stored into same slot.
                    // It is very unlikely to happen and will only mean that one of the objects will get collected.
                    items[i].Value = obj;
                    break;
                }
            }
        }

        /// <summary>
        /// Removes an object from leak tracking.  
        /// 
        /// This is called when an object is returned to the pool.  It may also be explicitly 
        /// called if an object allocated from the pool is intentionally not being returned
        /// to the pool.  This can be of use with pooled arrays if the consumer wants to 
        /// return a larger array to the pool than was originally allocated.
        /// </summary>
        [Conditional("DEBUG")]
        internal void ForgetTrackedObject(T old, T replacement = null)
        {
#if DETECT_LEAKS
            LeakTracker tracker;
            if (leakTrackers.TryGetValue(old, out tracker))
            {
                tracker.Dispose();
                leakTrackers.Remove(old);
            }
            else
            {
                var trace = CaptureStackTrace();
                Debug.WriteLine($"TRACEOBJECTPOOLLEAKS_BEGIN\nObject of type {typeof(T)} was freed, but was not from pool. \n Callstack: \n {trace} TRACEOBJECTPOOLLEAKS_END");
            }

            if (replacement != null)
            {
                tracker = new LeakTracker();
                leakTrackers.Add(replacement, tracker);
            }
#endif
        }

#if DETECT_LEAKS
        private static Lazy<Type> _stackTraceType = new Lazy<Type>(() => Type.GetType("System.Diagnostics.StackTrace"));

        private static object CaptureStackTrace()
        {
            return Activator.CreateInstance(_stackTraceType.Value);
        }
#endif

        [Conditional("DEBUG")]
        private void Validate(object obj)
        {
            Debug.Assert(obj != null, "freeing null?");

            Debug.Assert(_firstItem != obj, "freeing twice?");

            var items = _items;
            for (int i = 0; i < items.Length; i++)
            {
                var value = items[i].Value;
                if (value == null)
                {
                    return;
                }

                Debug.Assert(value != obj, "freeing twice?");
            }
        }
    }
}
于 2017-12-22T06:33:23.633 回答
0

我看不到使用 Interlocked 的任何真正好处,因为它必须以不安全的方式使用。锁定,只是更改对象内存空间上的一个位标志 - 确实非常非常快。互锁稍微好一点,因为它可以在寄存器上而不是在内存中完成。

您是否遇到性能问题?此类代码的主要目的是什么?归根结底,C# 旨在从您那里抽象出内存管理,以便您专注于业务问题。

请记住,如果您需要自己管理内存并使用不安全的指针,则必须固定内存区域 = 额外的性能成本。

于 2010-09-07T21:57:58.137 回答