2

在 .Net 4.5 中有一个 Stream 类的方法,它在监视取消令牌的同时从流中异步读取。

ReadAsync : 
       buffer:byte[] * 
       offset:int * 
       count:int * 
       cancellationToken:CancellationToken -> Task<int>

如果另一个线程应该首先触发取消令牌然后关闭流,那么是否保证在ReadAsync抛出异常之前将取消读取线程?

我可以使用 .Net 4.0 框架和没有 ReadAsync 的 F# 异步工作流以某种方式实现此保证吗(它没有接受要监视的取消令牌的重载)?

4

3 回答 3

3

除非您将多个任务链接在一起,否则这种特殊的重载基本上是无用的——cancellationToken仅在进入ReadAsync方法调用时检查,而不是在底层Stream.BeginRead调用执行时检查。

从 ILSpy 转储的代码:

public virtual Task<int> ReadAsync(byte[] buffer,
                                   int offset,
                                   int count,
                                   CancellationToken cancellationToken)
{
    if (!cancellationToken.IsCancellationRequested)
        return this.BeginEndReadAsync(buffer, offset, count);
    return Task.FromCancellation<int>(cancellationToken);
}

如您所见,cancellationToken不会转发到BeginEndReadAsync呼叫中,而BeginEndReadAsync只是按照以下方式实现Stream.BeginRead

private Task<int> BeginEndReadAsync(byte[] buffer, int offset, int count)
{
    return TaskFactory<int>.FromAsyncTrim<Stream, Stream.ReadWriteParameters>(
        this,
        new Stream.ReadWriteParameters
        {
            Buffer = buffer,
            Offset = offset,
            Count = count
        },
        (Stream stream, Stream.ReadWriteParameters args, AsyncCallback callback, object state) =>
            stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state),
        (Stream stream, IAsyncResult asyncResult) =>
            stream.EndRead(asyncResult)
    );
}

此时,您拥有的唯一保证是派生流类型所做出的保证,这些类型因类型而异。

请注意,这是基于当前的 .Net 4.5 位,并且实现当然会在未来发生变化。

于 2012-09-26T19:51:38.793 回答
1

您可以在使用/Task.IsCanceled运行任务之前进行检查。ResultRunSynchronously

这是一些示例代码:

use stream = new MemoryStream(Array.init 1000 (fun i -> byte (i % int Byte.MaxValue)))
use waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset)
use cts = new CancellationTokenSource()
let thread = Thread(fun () -> 
  Thread.Sleep(1000)
  let buf = Array.zeroCreate 100
  let task = stream.ReadAsync(buf, 0, buf.Length, cts.Token)
  if not task.IsCanceled then task.RunSynchronously()
  waitHandle.Set() |> ignore)
thread.Start()
cts.Cancel()
waitHandle.WaitOne() |> ignore

但是一旦ReadAsync开始,它抛出一个AggregateException声明任务已被取消。

于 2012-09-26T19:51:46.390 回答
0

AFAIK,您必须同时使用互斥锁来保护流并使用取消令牌来检查线程是否有待取消。没有异步原语Stream可以为您处理这个问题。

于 2012-09-27T15:32:59.560 回答