我在 SO 上找到了类似的线程,但似乎没有一个能解决我的确切问题。
基本上,我使用 .NET 任务并行库编写了一个简单的生产者/消费者应用程序。生产者每隔 30 秒检查一次数据库表中的记录。当它找到记录时,它会将它们添加到BlockingQueue
. 同时,我用来Task.Factory
对记录执行一些操作。
当即将对记录执行操作时,我想更新记录上的字段以指示它已经在队列中。这样一来,如果生产者在最后一批仍在处理时检查新记录,它就不会将旧条目添加回队列。
SubmitChanges()
我遇到的这个问题是从多个线程调用我的数据上下文。我想我正在参加比赛,但我不确定。
我得到的错误是The operation cannot be performed during a call to SubmitChanges.
生产者代码:
BlockingCollection<QueuedMessage> workItems = new BlockingCollection<QueuedMessage>();
System.Threading.Timer workItemTimer = new System.Threading.Timer((s) =>
{
var items = repository.GetQueuedMessages();
foreach (var item in items)
{
workItems.Add(item);
}
}, null, 0, 30000);
消费者守则:
while (workItems.TryTake(out queuedMessage, Timeout.Infinite, new CancellationToken()))
{
Task.Factory.StartNew((t) =>
{
var messageToSend = (QueuedMessage)t;
repository.MarkQueuedMessageAsProcessing(messageToSend.Id);
...
Do some stuff with messageToSend
....
}, queuedMessage);
}
存储库代码:
var entity = DataContext.QueuedMessages.SingleOrDefault(m => m.Id == messageId);
entity.ProcessingStarted = true;
DataContext.SubmitChanges();
当我使用一些排队的消息运行它时,DataContext.SubmitChanges()
将开始抛出我之前提到的消息的异常,The operation cannot be performed during a call to SubmitChanges.
就像我说的,我认为这是因为我从多个线程调用它,但我不知道如何解决这个问题。
我尝试将有问题的行更改为:
ThreadPool.QueueUserWorkItem(s => DataContext.SubmitChanges());
但结果是一样的。