7

我开发了一个库,它为工作项实现了生产者/消费者模式。工作被出列,并为每个出列的工作项启动一个单独的任务,其中包含失败和成功的延续。

任务继续在完成(或失败)其工作后重新排队工作项。

整个库共享一个 central CancellationTokenSource,在应用程序关闭时触发。

我现在面临重大的内存泄漏。如果任务是使用取消令牌作为参数创建的,那么这些任务似乎会保留在内存中,直到触发取消源(并在稍后处理)。

这可以在此示例代码 (VB.NET) 中重现。主要任务是包装工作项的任务,而后续任务将处理重新安排。

Dim oCancellationTokenSource As New CancellationTokenSource
Dim oToken As CancellationToken = oCancellationTokenSource.Token
Dim nActiveTasks As Integer = 0

Dim lBaseMemory As Long = GC.GetTotalMemory(True)

For iteration = 0 To 100 ' do this 101 times to see how much the memory increases

  Dim lMemory As Long = GC.GetTotalMemory(True)

  Console.WriteLine("Memory at iteration start: " & lMemory.ToString("N0"))
  Console.WriteLine("  to baseline: " & (lMemory - lBaseMemory).ToString("N0"))

  For i As Integer = 0 To 1000 ' 1001 iterations to get an immediate, measurable impact
    Interlocked.Increment(nActiveTasks)
    Dim outer As Integer = i
    Dim oMainTask As New Task(Sub()
                                ' perform some work
                                Interlocked.Decrement(nActiveTasks)
                              End Sub, oToken)
    Dim inner As Integer = 1
    Dim oFaulted As Task = oMainTask.ContinueWith(Sub()
                                                    Console.WriteLine("Failed " & outer & "." & inner)
                                                    ' if failed, do something with the work and re-queue it, if possible
                                                    ' (imagine code for re-queueing - essentially just a synchronized list.add)

                                                                                                            ' Does not help:
                                                    ' oMainTask.Dispose()
                                                  End Sub, oToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default)
    ' if not using token, does not cause increase in memory:
    'End Sub, TaskContinuationOptions.OnlyOnFaulted)

            ' Does not help:
    ' oFaulted.ContinueWith(Sub()
    '                         oFaulted.Dispose()
    '                       End Sub, TaskContinuationOptions.NotOnFaulted)


    Dim oSucceeded As Task = oMainTask.ContinueWith(Sub()
                                                      ' success
                                                      ' re-queue for next iteration
                                                      ' (imagine code for re-queueing - essentially just a synchronized list.add)

                                                                                                                ' Does not help:
                                                      ' oMainTask.Dispose()
                                                    End Sub, oToken, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default)
    ' if not using token, does not cause increase in memory:
    'End Sub, TaskContinuationOptions.OnlyOnRanToCompletion)

            ' Does not help:
    ' oSucceeded.ContinueWith(Sub()
    '                           oSucceeded.Dispose()
    '                         End Sub, TaskContinuationOptions.NotOnFaulted)


    ' This does not help either and makes processing much slower due to the thrown exception (at least one of these tasks is cancelled)
    'Dim oDisposeTask As New Task(Sub()
    '                               Try
    '                                 Task.WaitAll({oMainTask, oFaulted, oSucceeded, oFaultedFaulted, oSuccededFaulted})
    '                               Catch ex As Exception

    '                               End Try
    '                               oMainTask.Dispose()
    '                               oFaulted.Dispose()
    '                               oSucceeded.Dispose()                                     
    '                             End Sub)

    oMainTask.Start()
    '  oDisposeTask.Start()
  Next

  Console.WriteLine("Memory after creating tasks: " & GC.GetTotalMemory(True).ToString("N0"))

  ' Wait until all main tasks are finished (may not mean that continuations finished)

  Dim previousActive As Integer = nActiveTasks
  While nActiveTasks > 0
    If previousActive <> nActiveTasks Then
      Console.WriteLine("Active: " & nActiveTasks)
      Thread.Sleep(500)
      previousActive = nActiveTasks
    End If

  End While

  Console.WriteLine("Memory after tasks finished: " & GC.GetTotalMemory(True).ToString("N0"))

Next

我使用 ANTS Memory Profiler 测量了内存使用情况,发现 System.Threading.ExecutionContext 大幅增加,这可以追溯到任务延续和CancellationCallbackInfo.

如您所见,我已经尝试处理使用取消令牌的任务,但这似乎没有效果。

编辑

我正在使用 .NET 4.0

更新

即使只是将主要任务与失败的延续联系起来,内存使用量也会不断增加。任务继续似乎阻止了取消令牌注册的注销。

因此,如果一个任务与一个没有运行的延续链接在一起(由于TaskContinuationOptions),那么似乎存在内存泄漏。如果只有一个继续运行,那么我没有观察到内存泄漏。

解决方法

作为一种解决方法,我可以在没有任何内容的情况下进行单个延续,TaskContinuationOptions并在那里处理父任务的状态:

oMainTask.ContinueWith(Sub(t)
                     If t.IsCanceled Then
                       ' ignore
                     ElseIf t.IsCompleted Then
                       ' reschedule

                     ElseIf t.IsFaulted Then
                       ' error handling

                     End If
                   End Sub)

如果取消,我将不得不检查它的执行情况,但这似乎可以解决问题。我几乎怀疑 .NET Framework 中存在错误。具有互斥条件的任务取消并不罕见。

4

2 回答 2

4

一些观察

  1. 潜在的泄漏似乎只存在于存在未运行的任务“分支”的情况下。在您的示例中,如果您注释掉oFaulted任务,泄漏对我来说就消失了。如果你更新你的代码有oMainTask错误,所以oFaulted任务运行而oSucceeded任务不运行,那么注释掉oSucceeded可以防止泄漏。
  2. 也许没有帮助,但如果您oCancellationTokenSource.Cancel()在所有任务运行后调用,内存就会释放。Dispose 没有帮助,也没有任何组合 Dispose 取消源与任务。
  3. 我看了一下http://referencesource.microsoft.com/这是 4.5.2 (有没有办法查看早期框架?)我知道它不一定相同,但它有助于了解正在发生的事情类型. 基本上,当您将取消令牌传递给任务时,任务会使用取消令牌的取消源注册自己。因此,取消源包含对您所有任务的引用。我还不清楚为什么您的方案似乎正在泄漏。如果我发现任何东西,我会在有机会更深入地研究后更新。

解决方法

将您的分支逻辑移动到始终运行的延续。

Dim continuation As Task =
    oMainTask.ContinueWith(
        Sub(antecendent)
            If antecendent.Status = TaskStatus.Faulted Then
                'Handle errors
            ElseIf antecendent.Status = TaskStatus.RanToCompletion Then
                'Do something else
            End If
        End Sub,
        oToken,
        TaskContinuationOptions.None,
        TaskScheduler.Default)

无论如何,这很有可能比其他方法更轻。在这两种情况下,一个延续始终运行,但使用此代码仅创建 1 个延续任务而不是 2 个。

于 2015-06-08T06:16:48.793 回答
0

我可以通过移动这两行来解决.net 4.0下的问题

Dim oCancellationTokenSource As New CancellationTokenSource
Dim oToken As CancellationToken = oCancellationTokenSource.Token

在第一个循环内

然后在该循​​环结束时

oToken = Nothing
oCancellationTokenSource.Dispose()

我也搬了

Interlocked.Decrement(nActiveTasks)

在每个“最终”任务中,因为

While nActiveTasks > 0

不会是准确的。

这里是有效的代码

Imports System.Threading.Tasks
Imports System.Threading

Module Module1

Sub Main()
    Dim nActiveTasks As Integer = 0

    Dim lBaseMemory As Long = GC.GetTotalMemory(True)

    For iteration = 0 To 100 ' do this 101 times to see how much the memory increases
        Dim oCancellationTokenSource As New CancellationTokenSource
        Dim oToken As CancellationToken = oCancellationTokenSource.Token
        Dim lMemory As Long = GC.GetTotalMemory(True)

        Console.WriteLine("Memory at iteration start: " & lMemory.ToString("N0"))
        Console.WriteLine("  to baseline: " & (lMemory - lBaseMemory).ToString("N0"))

        For i As Integer = 0 To 1000 ' 1001 iterations to get an immediate, measurable impact
            Dim outer As Integer = iteration
            Dim inner As Integer = i

            Interlocked.Increment(nActiveTasks)

            Dim oMainTask As New Task(Sub()
                                          ' perform some work
                                      End Sub, oToken, TaskCreationOptions.None)

            oMainTask.ContinueWith(Sub()
                                       Console.WriteLine("Failed " & outer & "." & inner)
                                       Interlocked.Decrement(nActiveTasks)
                                   End Sub, oToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default)


            oMainTask.ContinueWith(Sub()
                                       If inner Mod 250 = 0 Then Console.WriteLine("Success " & outer & "." & inner)
                                       Interlocked.Decrement(nActiveTasks)
                                   End Sub, oToken, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default)


            oMainTask.Start()
        Next

        Console.WriteLine("Memory after creating tasks: " & GC.GetTotalMemory(True).ToString("N0"))


        Dim previousActive As Integer = nActiveTasks
        While nActiveTasks > 0
            If previousActive <> nActiveTasks Then
                Console.WriteLine("Active: " & nActiveTasks)
                Thread.Sleep(500)
                previousActive = nActiveTasks
            End If

        End While

        oToken = Nothing
        oCancellationTokenSource.Dispose()

        Console.WriteLine("Memory after tasks finished: " & GC.GetTotalMemory(True).ToString("N0"))

    Next

    Console.WriteLine("Final Memory after finished: " & GC.GetTotalMemory(True).ToString("N0"))

    Console.Read()
End Sub

End Module
于 2015-05-04T15:55:12.797 回答