我有一个生产者-消费者乘法任务类。我有一个方法:
private async Task Consume(CancellationToken cancellationToken){..}
有一个启动方法:
public void Run()
{
var workerCount = Session.GetParameters().GetThreadCount();
_workers = new List<Task>(workerCount);
for (var i = 0; i < workerCount; i++)
_workers.Add(Consume(StopCancellationTokenSource.Token));
Task.WhenAll(_workers).ContinueWith(_ => DoneEvent);
}
问题是DoneEvent被触发,但订阅者事件处理程序从未被执行。我成功运行事件处理程序的唯一方法是我将 Run 方法设置为async并在await before 之前添加Task.WhenAll(..)
。但随后又提出了另一个问题。在方法Consume()
中,我有一个ManualResetEvent PauseBlock
. 当它被重置时,主线程也会等待。
提前谢谢。
编辑: 我已经成功地做到了(两天后)我改变了一点 Run 方法:
public async void Run()
{
var workerCount = Session.GetParameters().GetThreadCount();
_workers = new List<Task>(workerCount);
for (var i = 0; i < workerCount; i++)
_workers.Add(Task.Run(()=> Consume(StopCancellationTokenSource.Token)));
await Task.WhenAll(_workers);
DoneEvent();
}
现在它工作正常。仅供参考消费者方法:
private async Task Consume(CancellationToken cancellationToken)
{
try
{
await Task.Delay(5000, cancellationToken);
IEngGroup engGroup;
while (Groups.TryDequeue(out engGroup))
{
cancellationToken.ThrowIfCancellationRequested();
if (!engGroup.IsEnabled || engGroup.Result.Status == ItemRunningStatus.Successful) continue;
if (engGroup.IsBreak) Pause();
//if paused wait
PauseBlock.WaitOne();
//if stoped throw
cancellationToken.ThrowIfCancellationRequested();
var groupRunner = new GroupRunner(cancellationToken, PauseBlock);
if (engGroup.FireAndForget)
groupRunner.RunGroup(engGroup);
else
await groupRunner.RunGroup(engGroup);
}
return;
}
catch (OperationCanceledException)
{
return ;
}
}
谢谢大家。如果您对 ow 有任何改进建议,我希望看到它。