0

我有一个昂贵的方法来调用创建一批源项目:

private Task<List<SourceItem>> GetUnprocessedBatch(int batchSize)
{
    //impl
}

我只想在没有要处理的项目(或低于某个阈值)时填充新项目。到目前为止,我无法弄清楚要使用哪种Source方法。

我已经实现了一个会不断返回新项目的原始流:

public class Stream
{
    private readonly Queue<SourceItem> scrapeAttempts;
    private int batchSize = 100;
    private int minItemCount = 10;

    public Stream()
    {
        scrapeAttempts = new Queue<SourceItem>();
    }

    public async Task<SourceItem> Next()
    {
        if (scrapeAttempts.Count < minItemCount)
        {
            var entryScrapeAttempts = await GetUnprocessedBatch(batchSize);
            entryScrapeAttempts.ForEach(attempt => scrapeAttempts.Enqueue(attempt));
        }

        return scrapeAttempts.Dequeue();
    }

}

我预计Source.Task会起作用,但看起来它只调用一次。如何为这种情况创建源?

4

1 回答 1

1

因此,从概念上讲,您想要的是一个 Source 阶段,它分批异步获取元素,缓冲批处理并将事件一个接一个地传播到下游。当缓冲区接近为空时,我们希望在侧线程上急切地调用下一个 fetch(但不超过一次),因此它可以在我们清空当前批次时完成。

这种行为需要构建一个自定义的 GraphStage。一个看起来像这样:

sealed class PreFetch<T> : GraphStage<SourceShape<T>>
{
    private readonly int threshold;
    private readonly Func<Task<IEnumerable<T>>> fetch;
    private readonly Outlet<T> outlet = new Outlet<T>("prefetch");

    public PreFetch(int threshold, Func<Task<IEnumerable<T>>> fetch)
    {
        this.threshold = threshold;
        this.fetch = fetch;
        this.Shape = new SourceShape<T>(this.outlet);
    }

    public override SourceShape<T> Shape { get; }

    protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);

    private sealed class Logic : GraphStageLogic
    {
        public Logic(PreFetch<T> stage) : base(stage.Shape)
        {
            // queue for batched elements
            var queue = new Queue<T>();
            // flag which indicates, that pull from downstream was made, 
            // but we didn't have any elements at that moment
            var wasPulled = false;
            // determines if fetch was already called
            var fetchInProgress = false;

            // in order to cooperate with async calls without data races, 
            // we need to register async callbacks for success and failure scenarios
            var onSuccess = this.GetAsyncCallback<IEnumerable<T>>(batch =>
            {
                foreach (var item in batch) queue.Enqueue(item);
                if (wasPulled)
                {
                    // if pull was requested but not fulfilled, we need to push now, as we have elements
                    // it assumes that fetch returned non-empty batch
                    Push(stage.outlet, queue.Dequeue());
                    wasPulled = false;
                }
                fetchInProgress = false;
            });
            var onFailure = this.GetAsyncCallback<Exception>(this.FailStage);

            SetHandler(stage.outlet, onPull: () => {
                if (queue.Count < stage.threshold && !fetchInProgress)
                {
                    // if queue occupation reached bellow expected capacity
                    // call fetch on a side thread and handle its result asynchronously
                    stage.fetch().ContinueWith(task =>
                    {
                        // depending on if task was failed or not, we call corresponding callback
                        if (task.IsFaulted || task.IsCanceled)
                            onFailure(task.Exception as Exception ?? new TaskCanceledException(task));
                        else onSuccess(task.Result);
                    });
                    fetchInProgress = true;
                }

                // if queue is empty, we cannot push immediatelly, so we only mark 
                // that pull request has been made but not fulfilled
                if (queue.Count == 0)
                    wasPulled = true;
                else
                {
                    Push(stage.outlet, queue.Dequeue());
                    wasPulled = false;
                }
            });
        }
    }
}
于 2019-05-29T04:56:40.970 回答