我有一个方法必须遍历大量数据并将处理后的结果返回给消费者线程以进行序列化。流式 Plinq 最适合性能。
因为这些操作很频繁,所以我使用对象池来缓存容器以进行处理,以尽量减少对象创建。我尝试使用并发堆栈实现对象池(concurrentbag 和 concurrentqueue 表现出相同的问题)。在同样罕见的情况下,同一个线程从池中获取同一个项目(查看哈希码),尽管它没有被消费者线程释放。我在池的获取和释放方法中添加了跟踪,这是输出:
5:11:32.250 PM 为线程 31 获取项目 16071020
5:11:32.254 PM 为线程 31 获取项目 16071020 5:11:32.260 PM
为线程 27 放置项目 16071020
5:11:32.286 PM 为线程 27 放置项目 16071020
这是我正在使用的代码:
var itemsToProcess = data.AsParallel()
.Where(x => Filter(x))
.Select(row => Process(row));
在 Process 方法中,我将从池中获取对象:
result = ObjectPool.Instance.GetObject();
池类实现:
public class ObjectPool
{
private ConcurrentStack<object[]> _objects;
private int size;
private const int maxSize = 20000;
private static ObjectPool instance = new ObjectPool(500);
public static ObjectPool Instance
{
get { return instance; }
}
private ObjectPool(int size)
{
this.size = size;
_objects = new ConcurrentStack<object[]>();
}
public object[] GetObject()
{
object[] item;
if (_objects.TryPop(out item))
{
Trace.WriteLine(string.Format("Get item {0} for Thread {1}", item.GetHashCode(), Thread.CurrentThread.ManagedThreadId));
return item;
}
return new object[size];
}
public void Clear()
{
_objects.Clear();
}
public void PutObject(object[] item)
{
Trace.WriteLine(string.Format("Put item {0} for Thread {1}", item.GetHashCode(), Thread.CurrentThread.ManagedThreadId));
if (_objects.Count < maxSize)
{
_objects.Push(item);
}
}
}
我不知道如何防止这种情况的发生。关于为什么会发生这种情况以及如何防止它的任何想法?