0

我已经苦苦挣扎了 2 天,为我的简单场景找到合适的解决方案。

场景我想
在 .Net Core Parallel 循环中完成多个数据库访问,该循环将在同一个数据库中插入多个项目。

var actions = new List<Action>();

actions.Add(() => { new DbContext.Set<TEntity>().Add(entity); });
actions.Add(() => { new DbContext.Set<TEntity>().Add(entity); });
actions.Add(() => { new DbContext.Set<TEntity>().Add(entity); });
actions.Add(() => { new DbContext.Set<TEntity>().Add(entity); });

Parallel.ForEach(actions, new ParallelOptions { MaxDegreeOfParallelism = 2 },
action =>
{
     action();
});

已知限制

  1. EF Core 的 DBContext 不是线程安全的(我们需要在每个 trhead 中重新创建它们)
  2. MySQL 服务器不接受使用同一事务建立的 2 个连接
  3. 似乎我们无法将已经打开的连接传递给 DBContext
  4. EF Core 每次与数据库交互时都会在内部打开和关闭与数据库的连接

例外

当前不支持多个同时连接或在同一事务中具有不同连接字符串的连接。

在 MySql.Data.MySqlClient.MySqlConnection.Open() 在 Microsoft.EntityFrameworkCore.Query.Internal.QueryingEnumerable 在 Microsoft.EntityFrameworkCore.Storage.RelationalConnection.Open(Boolean errorsExpected) 在 Microsoft.EntityFrameworkCore.Storage.RelationalConnection.OpenDbConnection(Boolean errorsExpected) 1.Enumerator.BufferlessMoveNext(DbContext _, Boolean buffer) at Microsoft.EntityFrameworkCore.Query.Internal.QueryingEnumerable1.Enumerator.MoveNext() 在 Microsoft.EntityFrameworkCore.Query.Internal.LinqOperatorProvider._TrackEntities[TOut,TIn](IEnumerable 1 results, QueryContext queryContext, IList1 entityTrackingInfos, IList 1 entityAccessors)+MoveNext() at Microsoft.EntityFrameworkCore.Query.Internal.LinqOperatorProvider.ExceptionInterceptor1.EnumeratorExceptionInterceptor.MoveNext() 在 System.Collections.Generic.List1.AddEnumerable(IEnumerable1 个可枚举)在 System.Linq.Enumerable.ToList[TSource](IEnumerable`1 源)的 CloseTheMonth.Backend.Data.Repositories.AccountUserRightRepository.ListAccounts(Guid userId) 在 C:\Work\GitHub\CloseTheMonth\Backend\CloseTheMonth。 Data\Repositories\AccountUserRightRepository.cs:C:\Work\GitHub\CloseTheMonth\Backend\CloseTheMonth.Services\AccountUserRightService.cs:CloseTheMonth.Backend.Services.AccountUserRightService.ListAccounts(Guid userId) 的第 44 行:CloseTheMonth.Backend 的第 53 行.Controllers.AppController.Init(字符串授权,AppInitRequest 请求)在 C:\Work\GitHub\CloseTheMonth\Backend\CloseTheMonth.Backend\Controllers\AppController.cs:line 101

反思...
如果我可以像这样在全局范围内打开一个连接,并将其传递给我的 DBContexts,那就可以了。但我检查了 EF Core 和 Pomelo 源代码,但没有找到实现类似目标的方法。

也许除了 Pomelo 之外的其他一些 EF Core MySQL 驱动程序可以做到这一点?

var actions = new List<Action>();

using (var conn = new MySqlConnection())
{
   actions.Add(() => { new DbContext(conn).Set<TEntity>().Add(entity); });
   actions.Add(() => { new DbContext(conn).Set<TEntity>().Add(entity); });
   actions.Add(() => { new DbContext(conn).Set<TEntity>().Add(entity); });
   actions.Add(() => { new DbContext(conn).Set<TEntity>().Add(entity); });

   Parallel.ForEach(actions, new ParallelOptions { MaxDegreeOfParallelism = 2 },
   action =>
   {
        action();
   });
}

我将 MySQL 服务器 (8.0.22) 与 Pomelo.EntityFrameworkCore.MySql (2.1.4) 一起使用

4

2 回答 2

2

如果代码不是线程安全的,则必须在每个线程中使用单独的 MySQL 连接。

MySQL 协议是有状态的,因此如果查询-响应周期的一部分与针对不同查询的不同查询-响应周期交错,则响应会变得混乱。你不会喜欢这个结果。

设计使用数据库的多线程代码的唯一明智方法是让每个线程打开自己的连接。

于 2021-03-01T17:22:22.073 回答
0

现在一切都清楚了,让我继续说简单的:

  1. 无法使用 EF Core 和 MySQL 进行多线程写入操作
  2. 可以使用 EF Core 和 MySQL 进行多线程读取操作

一、写操作

  • DBContext 不是线程安全的,每个线程需要一个上下文
  • 每次您使用 Context.SaveChanges 时,它都会打开和关闭一个连接
  • MySQL 拒绝在事务中打开多个连接
  • 您不能在同一个数据库连接上执行多线程操作

由于您不能多线程插入/更新/删除,因此您当然可以通过避免每次触摸实体时调用 SaveChanges 并等到提交事务之前对其进行一些优化。

首先,只要您不保存更改,它将允许 EF 留在内存中。

其次,如果 EF Core 必须进行任何优化以喜欢、批量插入或任何可能完成的事情,它将能够做到,因为您将所有数据库作业保留到最后(所以 EF 知道要做的工作)。

二、读操作

  • DBContext 不是线程安全的,每个线程需要一个上下文
  • 您需要在每个线程下创建一个范围
  • 依赖注入将为每个线程范围创建一个新的 UnitOfWork(因为您在 Startup 类中将其定义为 Scoped )
  • 您同时执行所有线程,EF Core 将处理多个连接

对于读取操作,由于我希望代码看起来干净,这就是我所做的:

public class Multithreader : IDisposable
{
    private List<Action> _actions = new List<Action>();

    public Multithreader(int maxThreads)
    {
        this._maxThreads = maxThreads;
    }

    public void Enqueue(Action action)
    {
        this._actions.Add(action);
    }

    public void Dispose()
    {
        Parallel.ForEach(this._actions, new ParallelOptions { MaxDegreeOfParallelism = 8 },
        action =>
        {
            action();
        });
    }
}

我还在我的 BaseController 中创建了一个辅助函数来获取一个 Scoped Services 类(它具有对我的服务的引用):

public class BaseController : ControllerBase
{
    private readonly IServiceProvider _serviceProvider;

    public BaseController(IServiceProvider serviceProvider)
    {
        this._serviceProvider = serviceProvider;
    }

    protected IServices GetScopedServices()
    {
        var scope = _serviceProvider.CreateScope();

        return scope.ServiceProvider.GetService<IServices>();
    }
}

然后我只是把所有东西都排入队列以获得我想要的东西:

using (var threader = new Multithreader())
{
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value1 = services.Accounts.GetValue1(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value2 = services.Accounts.GetValue2(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value3 = services.Accounts.GetValue3(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value4 = services.Accounts.GetValue4(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value5 = services.Accounts.GetValue5(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value6 = services.Accounts.GetValue6(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value7 = services.Accounts.GetValue7(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value8 = services.Accounts.GetValue8(); } });
    threader.Enqueue(() => { using (var services = this.GetScopedServices()) { entity.Value9 = services.Accounts.GetValue9(); } });
}
于 2021-03-01T23:28:01.107 回答