5

我正在尝试结合 using 创建一个任务管道/有序调度程序TaskFactory.FromAsync

我希望能够触发 Web 服务请求(FromAsync用于使用 I/O 完成端口)但保持它们的顺序并且在任何时候只有一个执行。

目前我不使用FromAsync,所以我可以做TaskFactory.StartNew(()=>api.DoSyncWebServiceCall())并依靠OrderedTaskScheduler使用的TaskFactory来确保只有一个请求是未完成的。

我假设使用该FromAsync方法时这种行为会保持不变,但事实并非如此:

TaskFactory<Stuff> taskFactory = new TaskFactory<Stuff>(new OrderedTaskScheduler());
var t1 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t2 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t3 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));

所有这些beginGetStuff方法都在调用中被FromAsync调用(因此虽然它们是按顺序分派的,但n同时发生了 api 调用)。

有一个重载FromAsync需要一个TaskScheduler:

public Task FromAsync(
    IAsyncResult asyncResult,
    Action<IAsyncResult> endMethod,
    TaskCreationOptions creationOptions,
    TaskScheduler scheduler
)

但文档说:

用于调度执行 end 方法的任务的 TaskScheduler。

正如您所看到的,它需要已经构建的IAsyncResult,而不是Func<IAsyncResult>.

这需要自定义FromAsync方法还是我错过了什么?谁能建议从哪里开始这个实现?

干杯,

编辑:

我想从调用者那里抽象出这种行为,因此,根据TaskFactory(使用专门的TaskScheduler)的行为,我需要立即返回 Task - 这个 Task 不仅会封装FromAsyncTask,还会在等待时封装该任务的队列轮到执行了。

一种可能的解决方案:

class TaskExecutionQueue
{
    private readonly OrderedTaskScheduler _orderedTaskScheduler;
    private readonly TaskFactory _taskFactory;
    public TaskExecutionQueue(OrderedTaskScheduler orderedTaskScheduler)
    {
        _orderedTaskScheduler = orderedTaskScheduler;
        _taskFactory = new TaskFactory(orderedTaskScheduler);

    }

    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return _taskFactory.StartNew(taskGenerator).Unwrap();
    }
}

FromAsync但是,这会在调用发生时使用线程。理想情况下,我不必这样做。

4

3 回答 3

2

最简单的方法是使用TPL Dataflow

您可以定义一个接收异步委托流并一次执行一个的“块”(等到每个委托完成后再开始下一个委托):

var block = new ActionBlock<Func<Task>>(func => func());

然后,触发 Web 服务请求:

block.Post(() => Task.Factory.FromAsync(...));

或(我更喜欢):

block.Post(() => client.GetStuffAsync(a, b, c));

ActionBlock如果您只想执行任务,这种方法很好。如果要产生输出流,请查看TransformBlock

var block = new TransformBlock<Func<Task<Stuff>>, Stuff>(func => func());

你以同样的方式触发你的请求,你可以通过调用Receive或得到结果ReceiveAsync

于 2012-12-06T18:11:02.867 回答
2

您无法安排 IO 任务,因为它们没有与之关联的线程。Windows 内核提供无线程 IO 操作。启动这些 IO 不涉及托管代码,并且TaskScheduler类不会发挥作用。

所以你必须延迟启动 IO,直到你确定你真的想要网络被击中。您可以使用SemaphoreSlim.WaitAsync限制当前运行的任务数量。在启动单个 IO 之前等待该方法的结果并等待它。

于 2012-12-06T17:22:57.383 回答
0

我已经决定在这里定制一个解决方案......锁很乱而且不受欢迎,但目前,这可以完成我想要的工作。

public interface ITaskExecutionQueue
{
    Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator);
    Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator);
    int OutstandingTaskCount { get; }
    event EventHandler OutstandingTaskCountChanged;
}

/// This class ensures that only a single Task is executed at any one time.  They are executed sequentially in order being queued.
/// The advantages of this class over OrderedTaskScheduler is that you can use any type of Task such as FromAsync (I/O Completion ports) 
/// which are not able to be scheduled using a traditional TaskScheduler.
/// Ensure that the `outer` tasks you queue are unstarted.  E.g. <![CDATA[
/// _taskExeQueue.QueueTask(new Task<Task<TResult>>(() => StartMyRealTask()));
/// ]]>
class OrderedTaskExecutionQueue : ITaskExecutionQueue
{
    private readonly Queue<Task> _queuedTasks = new Queue<Task>();
    private Task _currentTask;
    private readonly object _lockSync = new object();

    /// <summary>
    /// Queues a task for execution
    /// </summary>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="taskGenerator">An unstarted Task that creates your started real-work task</param>
    /// <returns></returns>
    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return QueueTask(new Task<Task<TResult>>(taskGenerator));
    }

    public Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator)
    {
        Task<TResult> unwrapped = taskGenerator.Unwrap();
        unwrapped.ContinueWith(_ =>
                               {
                                   EndTask();
                                   StartNextTaskIfQueued();
                               }, TaskContinuationOptions.ExecuteSynchronously);

        lock (_lockSync)
        {
            _queuedTasks.Enqueue(taskGenerator);

            if (_currentTask == null)
            {
                StartNextTaskIfQueued();
            }
        }

        TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
        tcs.TrySetFromTaskIncomplete(unwrapped);

        OutstandingTaskCountChanged.Raise(this);

        return tcs.Task;
    }

    private void EndTask()
    {
        lock (_lockSync)
        {
            _currentTask = null;
            _queuedTasks.Dequeue();
        }

        OutstandingTaskCountChanged.Raise(this);
    }

    private void StartNextTaskIfQueued()
    {
        lock (_lockSync)
        {
            if (_queuedTasks.Count > 0)
            {
                _currentTask = _queuedTasks.Peek();

                _currentTask.RunSynchronously();
            }
        }
    }

    /// <summary>
    /// Includes the currently executing task.
    /// </summary>
    public int OutstandingTaskCount
    {
        get
        {
            lock (_lockSync)
            {
                return _queuedTasks.Count;
            }
        }
    }

    public event EventHandler OutstandingTaskCountChanged;
}

接受一个未启动的Task<Task<TResult>>- 这允许队列决定何时执行它并开始FromAsync调用(这是内部任务)。用法:

Task<Task<TResult>> queueTask = new Task<Task<TResult>>(() => Task.Factory.FromAsync(beginAction, endAction));
Task<TResult> asyncCallTask = _taskExecutionQueue.QueueTask(queueTask);
于 2012-12-07T17:05:47.707 回答