4

我正在通过移植一些旧的套接字代码以使用 TPL 数据流和新的异步功能来试验 TPL 数据流。尽管 API 感觉坚如磐石,但我的代码最终还是感觉很乱。我想知道我是否在这里遗漏了什么。

我的要求如下: 一个套接字类公开:Open、Close、Send 和 Receive 方法。所有都返回一个任务,因此是异步的。Open 和 Close 是原子的。发送和接收可以彼此相邻工作,尽管它们一次只能处理 1 个命令。

从逻辑上讲,这让我想到了下一段内部控制代码:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;

// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()  { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()    { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });

到目前为止一切都很好。我可以安全地将操作发送到发送和接收块,而同时不必担心正在运行的连接相关操作。ActionBlock 还确保多个发送调用是同步的(接收、关闭和打开同上)。

问题是没有简单的方法让动作将任务传达回海报。现在我正在使用 TaskCompletionSource 来传达结果。喜欢:

public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();

    sendBlock.Post(async () =>
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Cant send when not open");
        else
        {
            await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
            resultCompletionSource.SetResult(null);
        }
    });

    return resultCompletionSource.Task;
}

只是感觉丑陋和笨拙。我的问题是:有没有办法使用 TPL 同步工作流,而不必使用 TaskCompletionSource 进行通信?

谢谢!

4

1 回答 1

5

首先,您根本不需要 TPL 数据流,因为您实际上没有任何数据流。

其次,TaskScheduler像这样使用 s 也不是正确的解决方案。TaskSchedulers schedule code,但是当你await有事的时候,没有代码在运行。因此,WriteAsync()在进行异步工作时,代码 forOpen()可以运行。

你真正需要的是类似的东西ReaderWriterLock,但它适用于async. 框架中没有类似的东西,但您可以使用Stephen Toub 的文章Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock中的代码,这正是您所需要的。该文章还更详细地解释了为什么使用 aTaskScheduler是错误的。

使用AsyncReaderWriterLock,您的代码可能如下所示:

public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    using (await readerWriterLock.ReaderLockAsync())
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Can't send when not open");

        await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
    }
}
于 2012-06-12T16:58:38.003 回答