22

我有一个 ID 列表,我需要在每个 ID 上运行多个存储过程。

当我使用标准的 foreach 循环时,它可以正常工作,但是当我有很多记录时,它的工作速度很慢。

我想将代码转换为与 EF 一起使用,但出现异常:“底层提供程序在打开时失败”。

我在 Parallel.ForEach 内使用此代码:

using (XmlEntities osContext = new XmlEntities())
{
    //The code
}

但它仍然抛出异常。

知道如何将 Parallel 与 EF 一起使用吗?我需要为我正在运行的每个过程创建一个新的上下文吗?我有大约 10 个程序,所以我认为创建 10 个上下文非常糟糕,每个上下文一个。

4

5 回答 5

42

Entity Framework 使用的底层数据库连接不是线程安全的。您需要为要执行的另一个线程上的每个操作创建一个新上下文。

您对如何并行化操作的担忧是有道理的;许多上下文的打开和关闭成本很高。

相反,您可能想要颠倒您对并行化代码的想法。似乎您正在循环多个项目,然后为每个项目串行调用存储过程。

如果可以,为每个过程Task<TResult>创建一个新的(或者Task,如果您不需要结果),然后在其中打开一个上下文,遍历所有项目,然后执行存储过程。这样,您只有与并行运行的存储过程数量相等的上下文数量。Task<TResult>

假设您有一个MyDbContext带有两个存储过程的DoSomething1DoSomething2,它们都采用一个类的实例,MyItem

实现上述内容如下所示:

// You'd probably want to materialize this into an IList<T> to avoid
// warnings about multiple iterations of an IEnumerable<T>.
// You definitely *don't* want this to be an IQueryable<T>
// returned from a context.
IEnumerable<MyItem> items = ...;

// The first stored procedure is called here.
Task t1 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething1(item);
    }
});

// The second stored procedure is called here.
Task t2 = Task.Run(() => { 
    // Create the context.
    using (var ctx = new MyDbContext())
    // Cycle through each item.
    foreach (MyItem item in items)
    {
        // Call the first stored procedure.
        // You'd of course, have to do something with item here.
        ctx.DoSomething2(item);
    }
});

// Do something when both of the tasks are done.

如果您不能并行执行存储过程(每个都依赖于以特定顺序运行),那么您仍然可以并行化您的操作,只是稍微复杂一点。

您将考虑在您的项目之间创建自定义分区(使用class上的静态Create方法)。这将为您提供获得实现的方法(注意,这不是您无法克服的)。PartitionerIEnumerator<T> IEnumerable<T>foreach

对于您返回的每个IEnumerator<T>实例,您将创建一个新实例Task<TResult>(如果您需要一个结果),并且在Task<TResult>正文中,您将创建上下文,然后循环通过 . 返回的项目IEnumerator<T>,按顺序调用存储过程。

看起来像这样:

// Get the partitioner.
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items);

// Get the partitions.
// You'll have to set the parameter for the number of partitions here.
// See the link for creating custom partitions for more
// creation strategies.
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount);

// Create a task for each partition.
Task[] tasks = partitions.Select(p => Task.Run(() => { 
        // Create the context.
        using (var ctx = new MyDbContext())
        // Remember, the IEnumerator<T> implementation
        // might implement IDisposable.
        using (p)
        // While there are items in p.
        while (p.MoveNext())
        {
            // Get the current item.
            MyItem current = p.Current;

            // Call the stored procedures.  Process the item
            ctx.DoSomething1(current);
            ctx.DoSomething2(current);
        }
    })).
    // ToArray is needed (or something to materialize the list) to
    // avoid deferred execution.
    ToArray();
于 2012-10-10T20:28:38.077 回答
6

EF 不是线程安全的,所以不能使用 Parallel。

看看实体框架和多线程

和这篇文章

于 2012-10-10T20:28:09.953 回答
3

这就是我使用的并且效果很好。它还支持处理错误异常并具有调试模式,这使得跟踪事情变得更加容易

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false)
{
    var exceptions = new ConcurrentQueue<Exception>();
    if (debugMode)
    {
        foreach (var item in items)
        {
            try
            {
                action(item);
            }
            // Store the exception and continue with the loop.                     
            catch (Exception e)
            {
                exceptions.Enqueue(e);
            }
        }
    }
    else
    {
        var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() =>
        {
            while (partition.MoveNext())
            {
                try
                {
                    action(partition.Current);
                }
                // Store the exception and continue with the loop.                     
                catch (Exception e)
                {
                    exceptions.Enqueue(e);
                }
            }
        }));
        Task.WaitAll(partitions.ToArray());
    }
    return exceptions;
}

您可以像下面这样使用它,其中 db 是原始 DbContext 并且 db.CreateInstance() 使用相同的连接字符串创建一个新实例。

        var batch = db.Set<SomeListToIterate>().ToList();
        var exceptions = batch.Parallel((item) =>
        {
            using (var batchDb = db.CreateInstance())
            {
                var batchTime = batchDb.GetDBTime();
                var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList();
                //do stuff to someData
                item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called
                batchDb.SaveChanges();        
            }                
        });
        if (exceptions.Count > 0)
        {
            logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions));
            throw new AggregateException(exceptions); //optionally throw an exception
        }
        db.SaveChanges(); //save the item modifications
于 2016-02-26T17:26:21.613 回答
0

在不知道内部异常结果(如果有的话)的情况下解决这个问题有点困难。这可能只是连接字符串或提供程序配置设置方式的问题。

通常,您必须小心并行代码和 EF。但是,您正在做的事情-应该-工作。我心中的一个问题;在并行之前,是否正在对该上下文的另一个实例进行任何工作?根据您的帖子,您正在每个线程中执行单独的上下文。那挺好的。然而,我的一部分想知道在多个上下文之间是否存在一些有趣的构造函数争用。如果您在该并行调用之前没有在任何地方使用该上下文,我建议您尝试对上下文运行一个简单的查询以打开它,并确保在运行并行方法之前启动所有 EF 位。我承认,我还没有完全尝试过你在这里所做的事情,但我已经做得很接近并且很有效。

于 2015-03-20T01:11:18.313 回答
-1

这是我为解决我的问题所做的事情。我试图在我的 Parallel.ForEach 中获得一个与 EF 有独特连接的订单,这就是我所坚持的,但并不是真的,我每次都有相同的最后一个订单。在操作结束时删除使用并关闭连接后,它工作得很好。

简而言之,如果您使用 using,请不要在 Parallel.Foreach 内的操作结束时关闭连接

我确实替换了这个

using ApplicationDbContext Context = CubiCommon.GetContext<ApplicationDbContext>("APIConnection");

这样

ApplicationDbContext Context = CubiCommon.GetContext<ApplicationDbContext>("APIConnection");

并在操作结束时添加

Context.Dispose();
Context = null;

也许这可以解决类似的问题

于 2021-09-24T18:24:39.280 回答