3

我在 StackOverflow 上阅读了许多文章和问题,这些文章和问题是关于Task使用TaskCompletionSource.

我最初的观察是,这种技术似乎转移了并发的责任。例如,Solace 代理库有一个Send()可能阻塞的方法,然后我们在网络通信完成后得到一个回调来指示“真正的”成功或失败。所以这个Send()方法可以很快被调用,而且vendor库内部限制了并发。

当您放置一个任务时,您似乎要么序列化操作(foreach message await SendWrapperAsync(message)),要么通过决定启动多少任务(例如,使用 TPL 数据流)自己承担并发责任。

无论如何,我决定Send用一个保证人包装调用,保证人将永远重试,直到回调指示成功,并负责并发性。这是一个“有保证的”消息传递系统。失败不是一种选择。这要求担保人可以施加背压,但这并不真正在这个问题的范围内。我在下面的示例代码中有一些关于它的评论。

它的意思是我的热路径,它包装了发送+回调,因为重试逻辑而“特别热”。所以这里有很多TaskCompletionSource创造。

供应商自己的文档建议尽可能重用其Message对象,而不是为每个Send. 我决定为此使用 aChannel作为环形缓冲区。但这让我想知道 - 是否有一些替代TaskCompletionSource方法 - 也许其他一些对象也可以缓存在环形缓冲区中并重用,达到相同的结果?

我意识到这可能是对微优化的过分热心尝试,老实说,我正在探索 C# 的几个方面,这些方面超出了我的薪酬等级(我是一个 SQL 人,真的),所以我可能会遗漏一些明显的东西。如果答案是“你实际上并不需要这种优化”,那我就不放心了。如果答案是“这真的是唯一明智的方法”,我的好奇心就会得到满足。

这是一个功能齐全的控制台应用程序,它模拟MockBroker对象中 Solace 库的行为,以及我对它进行包装的尝试。我的热门路径是类中的SendOneAsync方法Guarantor。代码对于 SO 来说可能有点太长了,但它是我可以创建的最小演示,它捕获了所有重要元素。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

internal class Message { public bool sent; public int payload; public object correlator; }

// simulate third party library behaviour
internal class MockBroker
{
    public bool TrySend(Message m, Action<Message> callback)
    {
        if (r.NextDouble() < 0.5) return false; // simulate chance of immediate failure / "would block" response
        Task.Run(() => { Thread.Sleep(100); m.sent = r.NextDouble() < 0.5; callback(m); }); // simulate network call
        return true;
    }

    private Random r = new();
}

// Turns MockBroker into a "guaranteed" sender with an async concurrency limit
internal class Guarantor
{
    public Guarantor(int maxConcurrency)
    {
        _broker = new MockBroker();
        // avoid message allocations in SendOneAsync
        _ringBuffer = Channel.CreateBounded<Message>(maxConcurrency);
        for (int i = 0; i < maxConcurrency; i++) _ringBuffer.Writer.TryWrite(new Message());
    }

    // real code pushing into a T.T.T.DataFlow block with bounded capacity and parallelism
    // execution options both equal to maxConcurrency here, providing concurrency and backpressure
    public async Task Post(int payload) => await SendOneAsync(payload);

    private async Task SendOneAsync(int payload)
    {
        Message msg = await _ringBuffer.Reader.ReadAsync();
        msg.payload = payload;
        // send must eventually succeed
        while (true)
        {
            // *** can this allocation be avoided? ***
            var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
            msg.correlator = tcs;
            // class method in real code, inlined here to make the logic more apparent
            Action<Message> callback = (msg) => (msg.correlator as TaskCompletionSource<bool>).SetResult(msg.sent);
            if (_broker.TrySend(msg, callback) && await tcs.Task) break;
            else
            {
                // simple demo retry logic
                Console.WriteLine($"retrying {msg.payload}");
                await Task.Delay(500);
            }
        }
        // real code raising an event here to indicate successful delivery
        await _ringBuffer.Writer.WriteAsync(msg);
        Console.WriteLine(payload);
    }

    private Channel<Message> _ringBuffer;
    private MockBroker _broker;
}

internal class Program
{
    private static async Task Main(string[] args)
    {
        // at most 10 concurrent sends
        Guarantor g = new(10);
        // hacky simulation since in this demo there's nothing generating continuous events,
        // no DataFlowBlock providing concurrency (it will be limited by the Channel instead),
        // and nobody to notify when messages are successfully sent
        List<Task> sends = new(100);
        for (int i = 0; i < 100; i++) sends.Add(g.Post(i));
        await Task.WhenAll(sends);
    }
}
4

1 回答 1

2

TaskCompletionSource是的,您可以通过使用轻量级ValueTasks 而不是s来避免分配实例TaskIValueTaskSource<T>首先,您需要一个可以实现接口的可重用对象,并且Message看起来是完美的候选对象。为了实现这个接口,你可以使用ManualResetValueTaskSourceCore<T>struct. 这是一个可变结构,因此不应将其声明为readonly. 您只需要将接口方法委托给此结构的相应方法,名称很长:

using System.Threading.Tasks.Sources;

internal class Message : IValueTaskSource<bool>
{
    public bool sent; public int payload; public object correlator;

    private ManualResetValueTaskSourceCore<bool> _source; // Mutable struct, not readonly

    public void Reset() => _source.Reset();
    public short Version => _source.Version;
    public void SetResult(bool result) => _source.SetResult(result);

    ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
        => _source.GetStatus(token);
    void IValueTaskSource<bool>.OnCompleted(Action<object> continuation,
        object state, short token, ValueTaskSourceOnCompletedFlags flags)
            => _source.OnCompleted(continuation, state, token, flags);
    bool IValueTaskSource<bool>.GetResult(short token) => _source.GetResult(token);
}

这三个成员GetStatus,OnCompletedGetResult是实现接口所必需的。其他三个成员(ResetVersionSetResult将用于创建和控制ValueTask<bool>s。

TrySend现在让我们将类的方法包装MockBroker在一个异步方法TrySendAsync中,该方法返回一个ValueTask<bool>

static class MockBrokerExtensions
{
    public static ValueTask<bool> TrySendAsync(this MockBroker source, Message message)
    {
        message.Reset();
        bool result = source.TrySend(message, m => m.SetResult(m.sent));
        if (!result) message.SetResult(false);
        return new ValueTask<bool>(message, message.Version);
    }
}

message.Reset();重置IValueTaskSource<bool>, 并声明先前的异步操作已完成。A 一次IValueTaskSource<T>只支持一个异步操作,producedValueTask<T>只能等待一次,在 next 之后就不能再等待了Reset()。这就是避免分配对象所必须付出的代价:您必须遵循更严格的规则。如果你试图改变规则(有意或无意地),ManualResetValueTaskSourceCore<T>就会开始到处乱扔InvalidOperationExceptions。

现在让我们使用TrySendAsync扩展方法:

while (true)
{
    if (await _broker.TrySendAsync(msg)) break;

    // simple demo retry logic
    Console.WriteLine($"retrying {msg.payload}");
    await Task.Delay(500);
}

Console您可以在整个GC.GetTotalAllocatedBytes(true)操作之前和之后打印,以查看差异。确保在发布模式下运行应用程序,以查看真实图片。您可能会看到差异并不那么令人印象深刻,因为实例的大小与 . 分配的字节以及为在 .中写入内容而生成的所有sTaskCompletionSource相比非常小。Task.DelaystringConsole

于 2021-09-12T12:04:53.453 回答