由于几个未知因素,这个问题比人们预期的要复杂一些:被池化的资源的行为、对象的预期/所需生命周期、需要池的真正原因等。通常池是专用的 - 线程池、连接池等 - 因为当您确切知道资源的作用并且更重要的是可以控制该资源的实现方式时,更容易优化一个。
因为它不是那么简单,所以我试图做的是提供一种相当灵活的方法,您可以尝试并看看哪种方法最有效。 提前为这篇长文道歉,但在实施一个体面的通用资源池时,还有很多内容需要涵盖。我真的只是在摸索表面。
通用池必须有一些主要的“设置”,包括:
- 资源加载策略 - 急切或懒惰;
- 资源加载机制——如何实际构建一个;
- 访问策略——你提到的“循环”并不像听起来那么简单;此实现可以使用类似但不完美的循环缓冲区,因为池无法控制何时实际回收资源。其他选项是 FIFO 和 LIFO;FIFO 将具有更多的随机访问模式,但 LIFO 使实施最近最少使用的释放策略变得更加容易(您说这超出了范围,但仍然值得一提)。
对于资源加载机制,.NET 已经为我们提供了一个干净的抽象——委托。
private Func<Pool<T>, T> factory;
将它传递给池的构造函数,我们就完成了。使用带有new()
约束的泛型类型也可以,但这更灵活。
在其他两个参数中,访问策略是更复杂的野兽,所以我的方法是使用基于继承(接口)的方法:
public class Pool<T> : IDisposable
{
// Other code - we'll come back to this
interface IItemStore
{
T Fetch();
void Store(T item);
int Count { get; }
}
}
这里的概念很简单——我们将让公共Pool
类处理线程安全等常见问题,但对每种访问模式使用不同的“项目存储”。LIFO 很容易用堆栈表示,FIFO 是一个队列,我使用了一个不是非常优化但可能足够的循环缓冲区实现,它使用一个List<T>
和索引指针来近似循环访问模式。
下面的所有类都是 the 的内部类Pool<T>
- 这是一种样式选择,但由于这些确实不打算在 之外使用Pool
,所以它是最有意义的。
class QueueStore : Queue<T>, IItemStore
{
public QueueStore(int capacity) : base(capacity)
{
}
public T Fetch()
{
return Dequeue();
}
public void Store(T item)
{
Enqueue(item);
}
}
class StackStore : Stack<T>, IItemStore
{
public StackStore(int capacity) : base(capacity)
{
}
public T Fetch()
{
return Pop();
}
public void Store(T item)
{
Push(item);
}
}
这些是显而易见的——堆栈和队列。我认为他们真的不需要太多解释。循环缓冲区稍微复杂一点:
class CircularStore : IItemStore
{
private List<Slot> slots;
private int freeSlotCount;
private int position = -1;
public CircularStore(int capacity)
{
slots = new List<Slot>(capacity);
}
public T Fetch()
{
if (Count == 0)
throw new InvalidOperationException("The buffer is empty.");
int startPosition = position;
do
{
Advance();
Slot slot = slots[position];
if (!slot.IsInUse)
{
slot.IsInUse = true;
--freeSlotCount;
return slot.Item;
}
} while (startPosition != position);
throw new InvalidOperationException("No free slots.");
}
public void Store(T item)
{
Slot slot = slots.Find(s => object.Equals(s.Item, item));
if (slot == null)
{
slot = new Slot(item);
slots.Add(slot);
}
slot.IsInUse = false;
++freeSlotCount;
}
public int Count
{
get { return freeSlotCount; }
}
private void Advance()
{
position = (position + 1) % slots.Count;
}
class Slot
{
public Slot(T item)
{
this.Item = item;
}
public T Item { get; private set; }
public bool IsInUse { get; set; }
}
}
我本可以选择许多不同的方法,但最重要的是资源应该按照创建它们的顺序访问,这意味着我们必须维护对它们的引用但将它们标记为“正在使用”(或不)。在最坏的情况下,只有一个插槽可用,并且每次提取都需要对缓冲区进行完整迭代。如果您汇集了数百个资源并且每秒获取和释放它们多次,那么这很糟糕;对于 5 到 10 个项目的池来说,这并不是一个真正的问题,在典型的情况下,资源消耗很少,它只需要提前一两个插槽。
请记住,这些类是私有内部类——这就是为什么它们不需要大量的错误检查,池本身限制对它们的访问。
抛出一个枚举和一个工厂方法,我们就完成了这部分:
// Outside the pool
public enum AccessMode { FIFO, LIFO, Circular };
private IItemStore itemStore;
// Inside the Pool
private IItemStore CreateItemStore(AccessMode mode, int capacity)
{
switch (mode)
{
case AccessMode.FIFO:
return new QueueStore(capacity);
case AccessMode.LIFO:
return new StackStore(capacity);
default:
Debug.Assert(mode == AccessMode.Circular,
"Invalid AccessMode in CreateItemStore");
return new CircularStore(capacity);
}
}
下一个要解决的问题是加载策略。我定义了三种类型:
public enum LoadingMode { Eager, Lazy, LazyExpanding };
前两个应该是不言自明的;第三种是混合的,它延迟加载资源,但实际上直到池满才开始重新使用任何资源。如果您希望池已满(听起来像您这样做)但希望将实际创建它们的费用推迟到第一次访问(即改善启动时间),这将是一个很好的权衡。
加载方法真的不是太复杂,现在我们有了 item-store 抽象:
private int size;
private int count;
private T AcquireEager()
{
lock (itemStore)
{
return itemStore.Fetch();
}
}
private T AcquireLazy()
{
lock (itemStore)
{
if (itemStore.Count > 0)
{
return itemStore.Fetch();
}
}
Interlocked.Increment(ref count);
return factory(this);
}
private T AcquireLazyExpanding()
{
bool shouldExpand = false;
if (count < size)
{
int newCount = Interlocked.Increment(ref count);
if (newCount <= size)
{
shouldExpand = true;
}
else
{
// Another thread took the last spot - use the store instead
Interlocked.Decrement(ref count);
}
}
if (shouldExpand)
{
return factory(this);
}
else
{
lock (itemStore)
{
return itemStore.Fetch();
}
}
}
private void PreloadItems()
{
for (int i = 0; i < size; i++)
{
T item = factory(this);
itemStore.Store(item);
}
count = size;
}
上面的size
和count
字段分别表示池的最大大小和池拥有的资源总数(但不一定可用)。 AcquireEager
是最简单的,它假设一个项目已经在商店中 - 这些项目将在构建时预加载,即在PreloadItems
最后显示的方法中。
AcquireLazy
检查池中是否有空闲项目,如果没有,则创建一个新项目。 AcquireLazyExpanding
只要池尚未达到其目标大小,就会创建一个新资源。我已经尝试对此进行优化以最小化锁定,我希望我没有犯任何错误(我已经在多线程条件下对此进行了测试,但显然并非详尽无遗)。
您可能想知道为什么这些方法都不费心检查商店是否已达到最大大小。一会儿我会讲到的。
现在是游泳池本身。这是完整的私有数据集,其中一些已经显示:
private bool isDisposed;
private Func<Pool<T>, T> factory;
private LoadingMode loadingMode;
private IItemStore itemStore;
private int size;
private int count;
private Semaphore sync;
回答我在上一段中忽略的问题 - 如何确保我们限制创建的资源总数 - 事实证明 .NET 已经有一个非常好的工具,它被称为Semaphore,它专门设计用于允许固定访问资源的线程数(在这种情况下,“资源”是内部项目存储)。由于我们没有实现完整的生产者/消费者队列,这完全可以满足我们的需求。
构造函数如下所示:
public Pool(int size, Func<Pool<T>, T> factory,
LoadingMode loadingMode, AccessMode accessMode)
{
if (size <= 0)
throw new ArgumentOutOfRangeException("size", size,
"Argument 'size' must be greater than zero.");
if (factory == null)
throw new ArgumentNullException("factory");
this.size = size;
this.factory = factory;
sync = new Semaphore(size, size);
this.loadingMode = loadingMode;
this.itemStore = CreateItemStore(accessMode, size);
if (loadingMode == LoadingMode.Eager)
{
PreloadItems();
}
}
这里应该不足为奇。唯一需要注意的是急切加载的特殊情况,使用PreloadItems
前面已经显示的方法。
由于现在几乎所有内容都已被干净地抽象出来,因此实际Acquire
和Release
方法非常简单:
public T Acquire()
{
sync.WaitOne();
switch (loadingMode)
{
case LoadingMode.Eager:
return AcquireEager();
case LoadingMode.Lazy:
return AcquireLazy();
default:
Debug.Assert(loadingMode == LoadingMode.LazyExpanding,
"Unknown LoadingMode encountered in Acquire method.");
return AcquireLazyExpanding();
}
}
public void Release(T item)
{
lock (itemStore)
{
itemStore.Store(item);
}
sync.Release();
}
如前所述,我们使用Semaphore
来控制并发性,而不是虔诚地检查项目商店的状态。只要获得的物品被正确释放,就没有什么可担心的。
最后但并非最不重要的是,有清理:
public void Dispose()
{
if (isDisposed)
{
return;
}
isDisposed = true;
if (typeof(IDisposable).IsAssignableFrom(typeof(T)))
{
lock (itemStore)
{
while (itemStore.Count > 0)
{
IDisposable disposable = (IDisposable)itemStore.Fetch();
disposable.Dispose();
}
}
}
sync.Close();
}
public bool IsDisposed
{
get { return isDisposed; }
}
该IsDisposed
属性的用途将在稍后变得清晰。所有主要Dispose
方法真正做的是处置实际的池项目(如果它们实现IDisposable
)。
现在你基本上可以按原样使用这个try-finally
块,但我不喜欢那种语法,因为如果你开始在类和方法之间传递池资源,那么它会变得非常混乱。使用资源的主类可能甚至没有对池的引用。它确实变得相当混乱,因此更好的方法是创建一个“智能”池对象。
假设我们从以下简单的接口/类开始:
public interface IFoo : IDisposable
{
void Test();
}
public class Foo : IFoo
{
private static int count = 0;
private int num;
public Foo()
{
num = Interlocked.Increment(ref count);
}
public void Dispose()
{
Console.WriteLine("Goodbye from Foo #{0}", num);
}
public void Test()
{
Console.WriteLine("Hello from Foo #{0}", num);
}
}
这是我们假装的一次性Foo
资源,它实现IFoo
并具有一些用于生成唯一身份的样板代码。我们要做的是创建另一个特殊的池化对象:
public class PooledFoo : IFoo
{
private Foo internalFoo;
private Pool<IFoo> pool;
public PooledFoo(Pool<IFoo> pool)
{
if (pool == null)
throw new ArgumentNullException("pool");
this.pool = pool;
this.internalFoo = new Foo();
}
public void Dispose()
{
if (pool.IsDisposed)
{
internalFoo.Dispose();
}
else
{
pool.Release(this);
}
}
public void Test()
{
internalFoo.Test();
}
}
这只是将所有“真实”方法代理到它的内部IFoo
(我们可以使用像 Castle 这样的动态代理库来做到这一点,但我不会深入探讨)。它还维护对Pool
创建它的对象的引用,因此当我们创建Dispose
这个对象时,它会自动将自己释放回池中。 除非池已经被释放——这意味着我们处于“清理”模式,在这种情况下,它实际上是清理内部资源。
使用上面的方法,我们可以编写如下代码:
// Create the pool early
Pool<IFoo> pool = new Pool<IFoo>(PoolSize, p => new PooledFoo(p),
LoadingMode.Lazy, AccessMode.Circular);
// Sometime later on...
using (IFoo foo = pool.Acquire())
{
foo.Test();
}
这是一件非常好的事情。这意味着使用的代码IFoo
(而不是创建它的代码)实际上不需要知道池。你甚至可以使用你最喜欢的 DI 库和作为提供者/工厂来注入对象。 IFoo
Pool<T>
我已将完整的代码放在 PasteBin 上,供您享受复制和粘贴的乐趣。还有一个简短的测试程序,您可以使用它来玩弄不同的加载/访问模式和多线程条件,以使您自己确信它是线程安全的并且没有错误。
如果您对此有任何疑问或疑虑,请告诉我。