0

我的 C# Web API 出现以下错误:“抛出异常:System.Data.dll 线程中的‘System.Threading.ThreadAbortException’被中止”。我在一个线程上有一个长时间运行的进程,使用我的数据访问逻辑类来获取和更新正在处理的记录。同时用户将另一个组提交给需要相同数据访问逻辑类的进程,从而导致错误。这是我正在做的事情的粗略草图。

WebAPI 类

public IHttpActionResult OkToProcess(string groupNameToProcess)
{
    var logic = GetLogic();
    //Gets All Unprocessed Records and Adds them to Blocking Queue
    Task.Factory.StartNew(() => dataAccessLogic.LoadAndProcess(groupNameToProcess);
}

public IHttpActionResult AddToProcess(int recordIdToProcess)
{
    StaticProcessingFactory.AddToQueue(recordIdToProcess);
}

静态加工工厂

internal static ConcurrentDictionary<ApplicationEnvironment, Logic> correctors = new ConcurrentDictionary<ApplicationEnvironment, Logic>();
        internal static BlockingCollection<CorrectionMessage> MessageQueue = new BlockingCollection<Message>(2000);

public void StartService(){
   Task.Factory.StartNew(() => LoadService());
}

public void LoadService(){
    var logic = GetLogic();
    if(isFirstGroupOkToProcessAsPerTextFileLog())
         logic.LoadAndProcess("FirstGroup");
    if(isSeconddGroupOkToProcessAsPerTextFileLog())
         logic.LoadAndProcess("SecondGroup");
}

public static GetLogic(){
     var sqlConnectionFactory = Tools.GetSqlConnectionFactory();
     string environment = ConfigurationManager.AppSettings["DefaultApplicationEnvironment"];
     ApplicationEnvironment applicationEnvironment = 
         ApplicationEnvironmentExtensions.ToApplicationEnvironment(environment);
     return correctors.GetOrAdd(applicationEnvironment, new Logic(sqlConnectionFactory ));
}

        public static void AddToQueue(Message message, bool completeAdding = true)
        {
            if (MessageQueue.IsAddingCompleted)
                MessageQueue = new BlockingCollection<Message>();

            if (completeAdding && message.ProcessImmediately)
                StartQueue(message);
            else
                MessageQueue.Add(message);
        }

     public static void StartQueue(Message message = null)
     {
          if (message != null)
            { 
                if(!string.IsNullOrEmpty(message.ID))
                    MessageQueue.Add(message);
                Logic logic = GetLogic(message.Environment);

                try
                {
                    var messages = MessageQueue.TakeWhile(x => logic.IsPartOfGroup(x.GroupName, message.GroupName));
                    if (messages.Count() > 0)
                        MessageQueue.CompleteAdding();
                    int i = 0;
                    foreach (var msg in messages)
                    {
                        i++;
                        Process(msg);
                    }
                }
                catch (InvalidOperationException) { MessageQueue.CompleteAdding(); }

            }
     }

     public static void Process(Message message)
     {
         Var logic = GetLogic(message.Environment);
         var record = logic.GetRecord(message.ID);
         record.Status = Status.Processed;
         logic.Save(record);
     }

逻辑类

private readonly DataAccess DataAccess;

public Logic(SqlConnectionFactory factory)
{
     DataAccess = new DataAcess(factory);
}

public void LoadAndProcess(string groupName)
{
     var groups = DataAccess.GetGroups();
     var records = DataAccess.GetRecordsReadyToProcess(groups);
     for(int i = 0; i < records.Count; i++)
            {
                Message message = new Message();
                message.Enviornment = environment.ToString();
                message.ID = records[i].ID;
                message.User = user;
                message.Group = groupName;
                message.ProcessImmediately = true;

                StaticProcessingFactory.AddToQueue(message, i + 1 == records.Count);
            }
}

有什么想法可以确保来自所有线程的所有流量都可以访问数据访问逻辑而不会系统地中止线程吗?

4

0 回答 0