1

这是我之前的问题的演变。回顾一下,我需要使用第 3 方 WebSocket,其中请求以一种方法发送,响应以另一种方法给出。我正在尝试将其转换为Task.

我阅读了这篇 MSDN 杂志文章,其中展示了一个使用 aSemaphoreSlim来控制条目的示例,但我的情况并不完全相同,我不确定最好的做法是什么。

我可以将它放在文章中所示SemaphoreSlim.Release()finally块中GetPositionsAsync(),但是如果出现错误,那么 finally 可能会在之前被调用,如果有另一个线程在等待信号量positionEnd(),则会导致错误。GetPositionsAsync()另一方面,如果我放入,SemaphoreSlim.Release()positionEnd()可以可靠地假设它GetPositionsAsync()会返回正确的结果吗?我担心如果释放发生,一个等待信号量的线程可能会Positions.Clear()在另一个线程保存 to 的值之前Positions调用result。所以无论哪种方式,实际上都是同样的担忧。哪个更好,或者有其他方法可以保护这个问题不一起发生?

这是我目前所拥有的......

private TaskCompletionSource<List<Position>> PositionsTaskSource { get; set; }
private readonly SemaphoreSlim PositionsMutex = new(1, 1);

public async Task<List<Position>> GetPositionsAsync()
{
    try
    {
        await PositionsMutex.WaitAsync().ConfigureAwait(false);
        Positions.Clear();
        PositionsTaskSource = new();
        IbWebSocket.reqPositions();
        var result = await PositionsTaskSource.Task;
        //Write to Trace Log here
        return result;
    }
    finally
    {
        //I could put the Release here as shown in the article
        //PositionsMutex.Release(); 
    }
}

/// <summary>
///        Provides a position to the reqPositions() method.  When the last position has been received positionEnd() is called.
/// </summary>
/// <param name="account"></param>
/// <param name="contract"></param>
/// <param name="value"></param>
public void position(string account, Contract contract, double value)
{
    Positions.Add(new Position(account, contract, value));
}

/// <summary>
///     Indicates all the positions have been transmitted.
/// </summary>
public void positionEnd()
{
    PositionsTaskSource.TrySetResult(Positions))
    PositionsMutex.Release();
}
4

2 回答 2

0

正如 MSDN 文章和NickRelease所述,将内部称为GetPositionsAsync'finally块就足够了。

为了更好地表达您的意图,我建议您对代码稍作改动:

  • 而不是使用TaskCompletionSource<List<Position>>
  • 我建议使用TaskCompletionSource<object>
private TaskCompletionSource<object> PositionsHasBeenPopulated;
private readonly SemaphoreSlim GetPositionAsyncMutex = new(1, 1);

public async Task<List<Position>> GetPositionsAsync()
{
    try
    {
        await GetPositionAsyncMutex.WaitAsync().ConfigureAwait(false);
        Positions.Clear();
        PositionsHasBeenPopulated = new();
        IbWebSocket.reqPositions();
        
        await PositionsHasBeenPopulated.Task;
        return Positions;
    }
    finally
    {
        GetPositionAsyncMutex.Release();
    }
}


public void position(string account, Contract contract, double value)
    => Positions.Add(new Position(account, contract, value));

public void positionEnd()
    => PositionsHasBeenPopulated.TrySetResult(null);
  • GetPositionAsyncMutex并发调用限制为一个
  • PositionsHasBeenPopulated用于信令
    • 您可以将TaskCompletionSource<object>其视为异步版本ManualResetEvent
于 2021-03-24T11:38:47.250 回答
0

没有必要打电话PositionsMutex.Release();positionEnd()一旦positionEnd()设置了任务结果,GetPositionsAsync将继续并最终释放信号量,从而允许下一个调用者重新进入。

于 2021-03-22T10:05:01.743 回答