4

找到了答案,...代码中的错误是我将数据添加到错误的列表中


我知道这已被问过几次,但我根本无法弄清楚为什么这会出现异常,我是否在某处丢失了锁?

var sendsToday = new List<TSend>();

var threads = _maxNumberOfThreads;
if (threads <= 0)
    threads = 1;

Parallel.ForEach(_subscribers,
    new ParallelOptions { MaxDegreeOfParallelism = threads },
    () => new ContentSendLocalStorage<TSend, TMedium>(_contentServices, _logService),
    (subscriber, loopState, localStorage) =>
    {
        localStorage.LogService.Warning(string.Format("Choosing content for subscriber {0}", subscriber.SubscriberId));
        foreach (var newsletterId in subscriber.SubscribedNewsletterIds)
        {
            localStorage.LogService.Warning(string.Format("Choosing content for newsletter {0}", newsletterId));
            var clicks = StateBag.Get<LookupList>(StateKeys.LookupList).Clicks.Where(c => c.Subscriber.SubscriberId == subscriber.SubscriberId).Select(c => c.Content.ContentId).ToArray();

            foreach (var contentService in _contentServices.Where(contentService => contentService.Contents.Count != 0))
            {
                subscriber.UrlsClicked = contentService.Contents
        .Where(c => clicks.Contains(c.ContentId))
        .GroupBy(g => g.Page.Url)
        .ToDictionary(k => k.Key, v => 1);

                var best = contentService.GetBestForSubscriber(subscriber, new TMedium { MediumId = int.Parse(newsletterId) });
                if (best != null)
        sendsToday.Add(best);
            }
        }

        localStorage.LogService.Warning(string.Format("Done choosing content for subscriber {0}", subscriber.SubscriberId));
        return localStorage;
    },
    finalStorage =>
    {
        lock (ContentSendLock)
        {
            sendsToday.AddRange(finalStorage.SubscriberSends);
        }
    });

我不断收到以下异常:

System.AggregateException: One or more errors occurred. ---> System.IndexOutOfRangeException: Index was outside the bounds of the array.
   at System.Collections.Generic.List`1.Add(T item)
   at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
   at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](TSource[] array, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEach[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Func`1 localInit, Func`4 body, Action`1 localFinally)
   at WFM.Newsletter.Business.CORE.OptimizationEngine`2.Start()
   at WFM.Newsletter.Business.CORE.OptimizationService.Start(IEnumerable`1 subscribers, IEnumerable`1 newsletters, LookupList lookupList, LocalConfig config, ILogging logService, Int32 maxNumberOfThreads, String subscriberClicksSourceTableName)
---> (Inner Exception #0) System.IndexOutOfRangeException: Index was outside the bounds of the array.
   at System.Collections.Generic.List`1.Add(T item)
   at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
   at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )<---

---> (Inner Exception #1) System.IndexOutOfRangeException: Index was outside the bounds of the array.
   at System.Collections.Generic.List`1.Add(T item)
   at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
   at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )<---

---> (Inner Exception #2) System.IndexOutOfRangeException: Index was outside the bounds of the array.
   at System.Collections.Generic.List`1.Add(T item)
   at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
   at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )<---
4

3 回答 3

2

在您的内部 foreach 中,您将添加到作为 List<> 的 sendToday。这发生在每个线程上。

if (best != null)
    sendsToday.Add(best);
}

尝试使用System.Collections.Concurrent实现之一(例如 ConcurrentBag),而不是使用 List<>。它为您在内部处理所有锁定,并且专为线程而设计。

var sendsToday = new ConcurrentBag<TSend>();
于 2012-10-11T20:04:48.180 回答
2

找到了答案,我既不需要改变我的逻辑来引入 ConcurrentBag,也没有大容量插入的问题。

代码中的错误是我将数据添加到错误的列表中,即 sendToday.Add(best);

相反,我不得不将它添加到“localstorage”中,由.Net 本身在我的最终存储中使用锁来处理。

所以而不是

 if (best != null)
        sendsToday.Add(best);

正确的代码是

if (best != null)
localStorage.SubscriberSends.Add(best);  

最终存储最终将每个与锁定机制合并。

无论如何感谢所有的支持!

于 2012-10-11T21:39:02.930 回答
1

由于生成异常的方法是List<T>.Add()我的猜测异常在这里抛出:

sendsToday.Add(best);

在列表周围添加一个lock访问权限,看看是否有帮助。

于 2012-10-11T20:05:18.917 回答