175

有没有人有很好的资源来为 Sql 连接池的有限资源实施共享对象池策略?(即完全实现它是线程安全的)。

为了跟进@Aaronaught 澄清请求,池的使用将用于对外部服务的负载平衡请求。把它放在一个可能比我直接的情况更容易立即理解的场景中。我有一个会话对象,其功能类似于ISessionNHibernate 中的对象。每个唯一会话都管理它与数据库的连接。目前我有 1 个长时间运行的会话对象,并且遇到了我的服务提供商限制我对这个单独会话的使用率的问题。

由于他们不期望单个会话将被视为长期运行的服务帐户,因此他们显然将其视为正在打击其服务的客户。这让我想到了这里的问题,我将创建一个不同会话池,并将请求拆分到多个会话中的服务,而不是像以前那样创建一个焦点,而不是创建一个单独的会话。

希望背景提供一些价值,但直接回答您的一些问题:

问:创建对象是否昂贵?
答:没有对象是有限资源池

问:它们会被非常频繁地收购/发布吗?
A:是的,他们可以再次被认为是 NHibernate ISessions,其中 1 通常在每个单个页面请求的持续时间内被获取和释放。

问:简单的先到先得就足够了,还是您需要更智能的东西,即可以防止饥饿?
答:一个简单的循环类型分发就足够了,我假设你的意思是如果没有可用的会话,调用者会被阻塞等待发布。这实际上并不适用,因为会话可以由不同的呼叫者共享。我的目标是在多个会话中分配使用,而不是 1 个会话。

我相信这可能与对象池的正常使用有所不同,这就是为什么我最初忽略了这部分并计划只是调整模式以允许共享对象,而不是让饥饿情况发生。

问:优先级、延迟加载与急切加载等问题呢?
答:不涉及优先级,为了简单起见,假设我将在创建池本身时创建可用对象池。

4

9 回答 9

326

由于几个未知因素,这个问题比人们预期的要复杂一些:被池化的资源的行为、对象的预期/所需生命周期、需要池的真正原因等。通常池是专用的 - 线程池、连接池等 - 因为当您确切知道资源的作用并且更重要的是可以控制该资源的实现方式时,更容易优化一个。

因为它不是那么简单,所以我试图做的是提供一种相当灵活的方法,您可以尝试并看看哪种方法最有效。 提前为这篇长文道歉,但在实施一个体面的通用资源池时,还有很多内容需要涵盖。我真的只是在摸索表面。

通用池必须有一些主要的“设置”,包括:

  • 资源加载策略 - 急切或懒惰;
  • 资源加载机制——如何实际构建一个;
  • 访问策略——你提到的“循环”并不像听起来那么简单;此实现可以使用类似但不完美的循环缓冲区,因为池无法控制何时实际回收资源。其他选项是 FIFO 和 LIFO;FIFO 将具有更多的随机访问模式,但 LIFO 使实施最近最少使用的释放策略变得更加容易(您说这超出了范围,但仍然值得一提)。

对于资源加载机制,.NET 已经为我们提供了一个干净的抽象——委托。

private Func<Pool<T>, T> factory;

将它传递给池的构造函数,我们就完成了。使用带有new()约束的泛型类型也可以,但这更灵活。


在其他两个参数中,访问策略是更复杂的野兽,所以我的方法是使用基于继承(接口)的方法:

public class Pool<T> : IDisposable
{
    // Other code - we'll come back to this

    interface IItemStore
    {
        T Fetch();
        void Store(T item);
        int Count { get; }
    }
}

这里的概念很简单——我们将让公共Pool类处理线程安全等常见问题,但对每种访问模式使用不同的“项目存储”。LIFO 很容易用堆栈表示,FIFO 是一个队列,我使用了一个不是非常优化但可能足够的循环缓冲区实现,它使用一个List<T>和索引指针来近似循环访问模式。

下面的所有类都是 the 的内部类Pool<T>- 这是一种样式选择,但由于这些确实不打算在 之外使用Pool,所以它是最有意义的。

    class QueueStore : Queue<T>, IItemStore
    {
        public QueueStore(int capacity) : base(capacity)
        {
        }

        public T Fetch()
        {
            return Dequeue();
        }

        public void Store(T item)
        {
            Enqueue(item);
        }
    }

    class StackStore : Stack<T>, IItemStore
    {
        public StackStore(int capacity) : base(capacity)
        {
        }

        public T Fetch()
        {
            return Pop();
        }

        public void Store(T item)
        {
            Push(item);
        }
    }

这些是显而易见的——堆栈和队列。我认为他们真的不需要太多解释。循环缓冲区稍微复杂一点:

    class CircularStore : IItemStore
    {
        private List<Slot> slots;
        private int freeSlotCount;
        private int position = -1;

        public CircularStore(int capacity)
        {
            slots = new List<Slot>(capacity);
        }

        public T Fetch()
        {
            if (Count == 0)
                throw new InvalidOperationException("The buffer is empty.");

            int startPosition = position;
            do
            {
                Advance();
                Slot slot = slots[position];
                if (!slot.IsInUse)
                {
                    slot.IsInUse = true;
                    --freeSlotCount;
                    return slot.Item;
                }
            } while (startPosition != position);
            throw new InvalidOperationException("No free slots.");
        }

        public void Store(T item)
        {
            Slot slot = slots.Find(s => object.Equals(s.Item, item));
            if (slot == null)
            {
                slot = new Slot(item);
                slots.Add(slot);
            }
            slot.IsInUse = false;
            ++freeSlotCount;
        }

        public int Count
        {
            get { return freeSlotCount; }
        }

        private void Advance()
        {
            position = (position + 1) % slots.Count;
        }

        class Slot
        {
            public Slot(T item)
            {
                this.Item = item;
            }

            public T Item { get; private set; }
            public bool IsInUse { get; set; }
        }
    }

我本可以选择许多不同的方法,但最重要的是资源应该按照创建它们的顺序访问,这意味着我们必须维护对它们的引用但将它们标记为“正在使用”(或不)。在最坏的情况下,只有一个插槽可用,并且每次提取都需要对缓冲区进行完整迭代。如果您汇集了数百个资源并且每秒获取和释放它们多次,那么这很糟糕;对于 5 到 10 个项目的池来说,这并不是一个真正的问题,在典型的情况下,资源消耗很少,它只需要提前一两个插槽。

请记住,这些类是私有内部类——这就是为什么它们不需要大量的错误检查,池本身限制对它们的访问。

抛出一个枚举和一个工厂方法,我们就完成了这部分:

// Outside the pool
public enum AccessMode { FIFO, LIFO, Circular };

    private IItemStore itemStore;

    // Inside the Pool
    private IItemStore CreateItemStore(AccessMode mode, int capacity)
    {
        switch (mode)
        {
            case AccessMode.FIFO:
                return new QueueStore(capacity);
            case AccessMode.LIFO:
                return new StackStore(capacity);
            default:
                Debug.Assert(mode == AccessMode.Circular,
                    "Invalid AccessMode in CreateItemStore");
                return new CircularStore(capacity);
        }
    }

下一个要解决的问题是加载策略。我定义了三种类型:

public enum LoadingMode { Eager, Lazy, LazyExpanding };

前两个应该是不言自明的;第三种是混合的,它延迟加载资源,但实际上直到池满才开始重新使用任何资源。如果您希望池已满(听起来像您这样做)但希望将实际创建它们的费用推迟到第一次访问(即改善启动时间),这将是一个很好的权衡。

加载方法真的不是太复杂,现在我们有了 item-store 抽象:

    private int size;
    private int count;

    private T AcquireEager()
    {
        lock (itemStore)
        {
            return itemStore.Fetch();
        }
    }

    private T AcquireLazy()
    {
        lock (itemStore)
        {
            if (itemStore.Count > 0)
            {
                return itemStore.Fetch();
            }
        }
        Interlocked.Increment(ref count);
        return factory(this);
    }

    private T AcquireLazyExpanding()
    {
        bool shouldExpand = false;
        if (count < size)
        {
            int newCount = Interlocked.Increment(ref count);
            if (newCount <= size)
            {
                shouldExpand = true;
            }
            else
            {
                // Another thread took the last spot - use the store instead
                Interlocked.Decrement(ref count);
            }
        }
        if (shouldExpand)
        {
            return factory(this);
        }
        else
        {
            lock (itemStore)
            {
                return itemStore.Fetch();
            }
        }
    }

    private void PreloadItems()
    {
        for (int i = 0; i < size; i++)
        {
            T item = factory(this);
            itemStore.Store(item);
        }
        count = size;
    }

上面的sizecount字段分别表示池的最大大小和池拥有的资源总数(但不一定可用)。 AcquireEager是最简单的,它假设一个项目已经在商店中 - 这些项目将在构建时预加载,即在PreloadItems最后显示的方法中。

AcquireLazy检查池中是否有空闲项目,如果没有,则创建一个新项目。 AcquireLazyExpanding只要池尚未达到其目标大小,就会创建一个新资源。我已经尝试对此进行优化以最小化锁定,我希望我没有犯任何错误(我已经在多线程条件下对此进行了测试,但显然并非详尽无遗)。

您可能想知道为什么这些方法都不费心检查商店是否已达到最大大小。一会儿我会讲到的。


现在是游泳池本身。这是完整的私有数据集,其中一些已经显示:

    private bool isDisposed;
    private Func<Pool<T>, T> factory;
    private LoadingMode loadingMode;
    private IItemStore itemStore;
    private int size;
    private int count;
    private Semaphore sync;

回答我在上一段中忽略的问题 - 如何确保我们限制创建的资源总数 - 事实证明 .NET 已经有一个非常好的工具,它被称为Semaphore,它专门设计用于允许固定访问资源的线程数(在这种情况下,“资源”是内部项目存储)。由于我们没有实现完整的生产者/消费者队列,这完全可以满足我们的需求。

构造函数如下所示:

    public Pool(int size, Func<Pool<T>, T> factory,
        LoadingMode loadingMode, AccessMode accessMode)
    {
        if (size <= 0)
            throw new ArgumentOutOfRangeException("size", size,
                "Argument 'size' must be greater than zero.");
        if (factory == null)
            throw new ArgumentNullException("factory");

        this.size = size;
        this.factory = factory;
        sync = new Semaphore(size, size);
        this.loadingMode = loadingMode;
        this.itemStore = CreateItemStore(accessMode, size);
        if (loadingMode == LoadingMode.Eager)
        {
            PreloadItems();
        }
    }

这里应该不足为奇。唯一需要注意的是急切加载的特殊情况,使用PreloadItems前面已经显示的方法。

由于现在几乎所有内容都已被干净地抽象出来,因此实际AcquireRelease方法非常简单:

    public T Acquire()
    {
        sync.WaitOne();
        switch (loadingMode)
        {
            case LoadingMode.Eager:
                return AcquireEager();
            case LoadingMode.Lazy:
                return AcquireLazy();
            default:
                Debug.Assert(loadingMode == LoadingMode.LazyExpanding,
                    "Unknown LoadingMode encountered in Acquire method.");
                return AcquireLazyExpanding();
        }
    }

    public void Release(T item)
    {
        lock (itemStore)
        {
            itemStore.Store(item);
        }
        sync.Release();
    }

如前所述,我们使用Semaphore来控制并发性,而不是虔诚地检查项目商店的状态。只要获得的物品被正确释放,就没有什么可担心的。

最后但并非最不重要的是,有清理:

    public void Dispose()
    {
        if (isDisposed)
        {
            return;
        }
        isDisposed = true;
        if (typeof(IDisposable).IsAssignableFrom(typeof(T)))
        {
            lock (itemStore)
            {
                while (itemStore.Count > 0)
                {
                    IDisposable disposable = (IDisposable)itemStore.Fetch();
                    disposable.Dispose();
                }
            }
        }
        sync.Close();
    }

    public bool IsDisposed
    {
        get { return isDisposed; }
    }

IsDisposed属性的用途将在稍后变得清晰。所有主要Dispose方法真正做的是处置实际的池项目(如果它们实现IDisposable)。


现在你基本上可以按原样使用这个try-finally块,但我不喜欢那种语法,因为如果你开始在类和方法之间传递池资源,那么它会变得非常混乱。使用资源的主类可能甚至没有对池的引用。它确实变得相当混乱,因此更好的方法是创建一个“智能”池对象。

假设我们从以下简单的接口/类开始:

public interface IFoo : IDisposable
{
    void Test();
}

public class Foo : IFoo
{
    private static int count = 0;

    private int num;

    public Foo()
    {
        num = Interlocked.Increment(ref count);
    }

    public void Dispose()
    {
        Console.WriteLine("Goodbye from Foo #{0}", num);
    }

    public void Test()
    {
        Console.WriteLine("Hello from Foo #{0}", num);
    }
}

这是我们假装的一次性Foo资源,它实现IFoo并具有一些用于生成唯一身份的样板代码。我们要做的是创建另一个特殊的池化对象:

public class PooledFoo : IFoo
{
    private Foo internalFoo;
    private Pool<IFoo> pool;

    public PooledFoo(Pool<IFoo> pool)
    {
        if (pool == null)
            throw new ArgumentNullException("pool");

        this.pool = pool;
        this.internalFoo = new Foo();
    }

    public void Dispose()
    {
        if (pool.IsDisposed)
        {
            internalFoo.Dispose();
        }
        else
        {
            pool.Release(this);
        }
    }

    public void Test()
    {
        internalFoo.Test();
    }
}

这只是将所有“真实”方法代理到它的内部IFoo(我们可以使用像 Castle 这样的动态代理库来做到这一点,但我不会深入探讨)。它还维护对Pool创建它的对象的引用,因此当我们创建Dispose这个对象时,它会自动将自己释放回池中。 除非池已经被释放——这意味着我们处于“清理”模式,在这种情况下,它实际上是清理内部资源


使用上面的方法,我们可以编写如下代码:

// Create the pool early
Pool<IFoo> pool = new Pool<IFoo>(PoolSize, p => new PooledFoo(p),
    LoadingMode.Lazy, AccessMode.Circular);

// Sometime later on...
using (IFoo foo = pool.Acquire())
{
    foo.Test();
}

这是一件非常好的事情。这意味着使用的代码IFoo(而不是创建它的代码)实际上不需要知道池。你甚至可以使用你最喜欢的 DI 库和作为提供者/工厂来注入对象。 IFooPool<T>


我已将完整的代码放在 PasteBin 上,供您享受复制和粘贴的乐趣。还有一个简短的测试程序,您可以使用它来玩弄不同的加载/访问模式和多线程条件,以使您自己确信它是线程安全的并且没有错误。

如果您对此有任何疑问或疑虑,请告诉我。

于 2010-04-03T22:40:11.067 回答
67

.NET Core 中的对象池

dotnet 核心在基类库 (BCL) 中添加了对象池的实现。您可以在此处阅读原始 GitHub 问题并查看System.Buffers的代码。目前ArrayPool是唯一可用的类型,用于池化数组。这里有一篇不错的博文。

namespace System.Buffers
{
    public abstract class ArrayPool<T>
    {
        public static ArrayPool<T> Shared { get; internal set; }

        public static ArrayPool<T> Create(int maxBufferSize = <number>, int numberOfBuffers = <number>);

        public T[] Rent(int size);

        public T[] Enlarge(T[] buffer, int newSize, bool clearBuffer = false);

        public void Return(T[] buffer, bool clearBuffer = false);
    }
}

在 ASP.NET Core 中可以看到其使用示例。因为它在 dotnet core BCL 中,所以 ASP.NET Core 可以与其他对象共享它的对象池,例如 Newtonsoft.Json 的 JSON 序列化程序。您可以阅读这篇博文,了解有关 Newtonsoft.Json 如何做到这一点的更多信息。

Microsoft Roslyn C# 编译器中的对象池

新的 Microsoft Roslyn C# 编译器包含ObjectPool类型,该类型用于汇集经常使用的对象,这些对象通常会被更新并经常被垃圾收集。这减少了必须发生的垃圾收集操作的数量和大小。有几个不同的子实现都使用了 ObjectPool(请参阅:为什么 Roslyn 中有这么多对象池的实现?)。

1 - SharedPools - 如果使用 BigDefault,则存储 20 个对象或 100 个对象的池。

// Example 1 - In a using statement, so the object gets freed at the end.
using (PooledObject<Foo> pooledObject = SharedPools.Default<List<Foo>>().GetPooledObject())
{
    // Do something with pooledObject.Object
}

// Example 2 - No using statement so you need to be sure no exceptions are not thrown.
List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear();
// Do something with list
SharedPools.Default<List<Foo>>().Free(list);

// Example 3 - I have also seen this variation of the above pattern, which ends up the same as Example 1, except Example 1 seems to create a new instance of the IDisposable [PooledObject<T>][4] object. This is probably the preferred option if you want fewer GC's.
List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear();
try
{
    // Do something with list
}
finally
{
    SharedPools.Default<List<Foo>>().Free(list);
}

2 - ListPoolStringBuilderPool - 不是严格分开的实现,而是围绕上面所示的 SharedPools 实现的包装器,专门用于 List 和 StringBuilder。所以这会重用存储在 SharedPools 中的对象池。

// Example 1 - No using statement so you need to be sure no exceptions are thrown.
StringBuilder stringBuilder= StringBuilderPool.Allocate();
// Do something with stringBuilder
StringBuilderPool.Free(stringBuilder);

// Example 2 - Safer version of Example 1.
StringBuilder stringBuilder= StringBuilderPool.Allocate();
try
{
    // Do something with stringBuilder
}
finally
{
    StringBuilderPool.Free(stringBuilder);
}

3 - PooledDictionaryPooledHashSet - 它们直接使用 ObjectPool 并具有完全独立的对象池。存储 128 个对象的池。

// Example 1
PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance()
// Do something with hashSet.
hashSet.Free();

// Example 2 - Safer version of Example 1.
PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance()
try
{
    // Do something with hashSet.
}
finally
{
    hashSet.Free();
}

Microsoft.IO.RecyclableMemoryStream

该库为MemoryStream对象提供池化。它是System.IO.MemoryStream. 它具有完全相同的语义。它是由 Bing 工程师设计的。阅读此处的博文或查看GitHub 上的代码。

var sourceBuffer = new byte[]{0,1,2,3,4,5,6,7}; 
var manager = new RecyclableMemoryStreamManager(); 
using (var stream = manager.GetStream()) 
{ 
    stream.Write(sourceBuffer, 0, sourceBuffer.Length); 
}

请注意,RecyclableMemoryStreamManager应该声明一次,它将在整个过程中存在——这就是池。如果您愿意,可以使用多个池。

于 2015-06-05T10:59:43.633 回答
7

像这样的东西可能适合您的需求。

/// <summary>
/// Represents a pool of objects with a size limit.
/// </summary>
/// <typeparam name="T">The type of object in the pool.</typeparam>
public sealed class ObjectPool<T> : IDisposable
    where T : new()
{
    private readonly int size;
    private readonly object locker;
    private readonly Queue<T> queue;
    private int count;


    /// <summary>
    /// Initializes a new instance of the ObjectPool class.
    /// </summary>
    /// <param name="size">The size of the object pool.</param>
    public ObjectPool(int size)
    {
        if (size <= 0)
        {
            const string message = "The size of the pool must be greater than zero.";
            throw new ArgumentOutOfRangeException("size", size, message);
        }

        this.size = size;
        locker = new object();
        queue = new Queue<T>();
    }


    /// <summary>
    /// Retrieves an item from the pool. 
    /// </summary>
    /// <returns>The item retrieved from the pool.</returns>
    public T Get()
    {
        lock (locker)
        {
            if (queue.Count > 0)
            {
                return queue.Dequeue();
            }

            count++;
            return new T();
        }
    }

    /// <summary>
    /// Places an item in the pool.
    /// </summary>
    /// <param name="item">The item to place to the pool.</param>
    public void Put(T item)
    {
        lock (locker)
        {
            if (count < size)
            {
                queue.Enqueue(item);
            }
            else
            {
                using (item as IDisposable)
                {
                    count--;
                }
            }
        }
    }

    /// <summary>
    /// Disposes of items in the pool that implement IDisposable.
    /// </summary>
    public void Dispose()
    {
        lock (locker)
        {
            count = 0;
            while (queue.Count > 0)
            {
                using (queue.Dequeue() as IDisposable)
                {

                }
            }
        }
    }
}

示例用法

public class ThisObject
{
    private readonly ObjectPool<That> pool = new ObjectPool<That>(100);

    public void ThisMethod()
    {
        var that = pool.Get();

        try
        { 
            // Use that ....
        }
        finally
        {
            pool.Put(that);
        }
    }
}
于 2010-04-02T00:57:12.187 回答
6

来自 MSDN 的示例:如何:使用 ConcurrentBag 创建对象池

于 2015-05-18T20:43:17.907 回答
4

过去,Microsoft 通过 Microsoft Transaction Server (MTS) 和后来的 COM+ 提供了一个框架来为 COM 对象进行对象池。该功能在 .NET Framework 中被继承到 System.EnterpriseServices,现在在 Windows Communication Foundation 中。

WCF 中的对象池

本文来自 .NET 1.1,但仍应适用于当前版本的框架(即使 WCF 是首选方法)。

对象池 .NET

于 2010-04-02T00:47:59.760 回答
4

我真的很喜欢 Aronaught 的实现——特别是因为他通过使用信号量来处理等待资源变得可用。我想补充几点:

  1. 将超时更改sync.WaitOne()为方法上的参数并将其公开。当线程超时等待对象变为可用时,这也需要处理条件。sync.WaitOne(timeout)Acquire(int timeout)
  2. 添加Recycle(T item)方法以处理发生故障时需要回收对象的情况,例如。
于 2010-08-23T20:27:12.940 回答
3

面向 Java,本文公开了 connectionImpl 池模式和抽象对象池模式,可能是一个很好的第一种方法: http: //www.developer.com/design/article.php/626171/Pattern-Summaries-Object-Pool。 htm

对象池模式:

图案

于 2010-03-29T14:51:27.997 回答
3

这是另一种实现,池中的对象数量有限。

public class ObjectPool<T>
    where T : class
{
    private readonly int maxSize;
    private Func<T> constructor;
    private int currentSize;
    private Queue<T> pool;
    private AutoResetEvent poolReleasedEvent;

    public ObjectPool(int maxSize, Func<T> constructor)
    {
        this.maxSize = maxSize;
        this.constructor = constructor;
        this.currentSize = 0;
        this.pool = new Queue<T>();
        this.poolReleasedEvent = new AutoResetEvent(false);
    }

    public T GetFromPool()
    {
        T item = null;
        do
        {
            lock (this)
            {
                if (this.pool.Count == 0)
                {
                    if (this.currentSize < this.maxSize)
                    {
                        item = this.constructor();
                        this.currentSize++;
                    }
                }
                else
                {
                    item = this.pool.Dequeue();
                }
            }

            if (null == item)
            {
                this.poolReleasedEvent.WaitOne();
            }
        }
        while (null == item);
        return item;
    }

    public void ReturnToPool(T item)
    {
        lock (this)
        {
            this.pool.Enqueue(item);
            this.poolReleasedEvent.Set();
        }
    }
}
于 2013-04-02T20:45:11.990 回答
1

您可以使用 NuGet 包Microsoft.Extensions.ObjectPool

这里的文档:

于 2020-05-15T14:21:32.440 回答