1

我正在使用使用 TCP 和 SSL 流的非常特定的 Web API。与 API 的连接是持久的(不应在每次写入/读取后关闭),因为此连接用于接收来自服务器的事件通知。

于是我实现了一个循环阅读的方法:

private void Listen()
{
    int length;
    do
    {
        var buffer = new byte[TcpClient.ReceiveBufferSize];
        length = SslStream.Read(buffer, 0, TcpClient.ReceiveBufferSize);

        if (length > 0)
        {
            // Process received data
            Listen();
        }
    } while (length > 0);

    //reconnect
}

现在我需要调用一些 API 方法。我希望他们支持 TPL(异步和等待)。问题是这些方法的异步部分实际上是在上面的读取方法(Listen)中实现的。例如:

public async Task<bool> Authenticate(string clientId, string clientSecret)
{
    await SendAuthMessageOverNetwork(clientId, clientSecret);
    return await ReceiveAuthResponseFromServer();
    // The problem is that there is no ReceiveAuthResponseFromServer method - 
    // the response is received in reading thread (Listen).
}

我对 TPL 还不是很熟悉。我通过使用一个任务解决了这个问题,它实际上什么都不做,只是等待来自读取线程的信号(AutoResetEvent):

private AutoResetEvent _loginEvent;
private bool _loginResult;

public async Task<bool> Authenticate(string clientId, string clientSecret)
{
    await SendAuthMessageOverNetwork(clientId, clientSecret);
    return await Task<bool>.Factory.StartNew(() => { _loginEvent.WaitOne(); return _loginResult; });
}

private void Listen()
{
    ...
    if (msg.Type == MessageTypes.AuthenticationResponse)
    {
        _loginResult = true;
        _loginEvent.Set();
    }
    ...
}

但我不太喜欢这个解决方案。也许有一种更简单的方法来实现我想要的?这可以通过仅使用任务功能而不使用 AutoResetEvent 和中间全局变量来完成吗?

4

1 回答 1

4

你是对的,使用 AutoResetEvent 是错误的方法。你真正想做的是:

  • 假设每个 SendAuthMessageOverNetwork 都与 ReceiveAuthResponseFromServer 匹配,请将它们组合成一个方法。
  • 在该方法中,发送请求并将新的 TaskCompletionSource 放入读取循环可以看到的队列中
  • 返回任务完成源的 Task 属性作为结果
  • 在读取循环中,当您读取消息时,将下一个完成源从队列中取出并使用 taskSource.SetResult(responseFromServer)

所以是这样的:

private readonly Queue<ResponseMessageType> _responseQueue = new Queue<ResponseMessageType>();

public async Task<bool> Authenticate(string clientId, string clientSecret) {
    var response = AsyncRequestAResponse(MakeAuthMessage(clientId, clientSecret));
    return (await response).Type == MessageTypes.AuthenticationResponse
}

public Task<bool> AsyncRequestAResponse(RequestMessageType request) {
    var responseSource = new TaskCompletionSource<ResponseMessageType>();
    _responseQueue.Enqueue(responseSource);
    Send(request);
    return responseSource.Task
}

private void Listen() {
    ...
    if (_responseQueue.Count == 0)
        throw new Exception("Erm, why are they responding before we requested anything?");
    _responseQueue.Dequeue().SetResult(msg);
}

换句话说,用于TaskCompletionSource<T>将您在内部执行的网络读/写内容转换为您向调用者公开的异步内容。

它可能看起来与上面不完全一样......我假设示例中存在继承顺序和一对一的响应/请求。您可能必须将请求 ID 与响应 ID 或类似的东西相匹配,并且具有将异常而不是结果放入队列任务等的超时。

此外,如果响应可以与发送的请求同时到达,那么在涉及队列的操作周围放置锁很重要。

于 2013-11-05T06:09:49.957 回答