0

再会。

我有一个用于 rpc 调用的 TPL 数据流网格

它有两个未扭结的流程,简化后如下所示:

输出流量:

  • BlockBuffer 存储输出
  • ActionBlock 将输出发送到服务器并生成发送的 id

和输入流:

  • while循环接收数据
  • TransformBlock 解析数据
  • BlockBuffer 用 sentid 保存答案

有一个问题:当我从不同的线程拨打电话时,我可能会弄乱答案,所以我需要过滤它。

我的 RPC 调用:

public async Task<RpcAnswer> PerformRpcCall(Call rpccall)
{
    ...
    _outputRpcCalls.Post(rpccall);
    long uniqueId = GetUniq(); // call unique id
    ...
    var sent = new Tuple<long, long>(uniqueId, 0);
    while (_sentRpcCalls.TryReceive(u => u.Item1 == uniqueId, out sent)) ; // get generated id from send function

    return await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(30));
}

如您所见,我有 uniqueId 可以帮助我确定此呼叫的答案,但是我如何过滤它并等待它?

拥有一些缓冲区数组(可能是WriteOnceBlock?)是一个好方法吗?这些缓冲区将在rpc调用中创建并带有过滤器的LinkedTo?

4

2 回答 2

0

一种方法是为每个 id 创建一个新块,并将其链接到 answers 块,使用谓词检查 id 并MaxMessages设置为 1:

Task<Answer> ReceiveAnswerAsync(int uniqueId)
{
    var block = new BufferBlock<Answer>();

    _inputAnswers.LinkTo(
        block,
        new DataflowLinkOptions { MaxMessages = 1, PropagateCompletion = true },
        answer => answer.Id == uniqueId);

    return block.ReceiveAsync();
}
于 2013-08-30T22:46:28.830 回答
0

好的,我没有找到任何合适的方法,所以我做了一个肮脏的解决方法

while (true)
{
    answer = await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(5)); 

    if (answer.Success)
    {
        if (answer.Answer.Combinator.ValueType.Equals(rpccall.Combinator.ValueType))
        {
            break;
        }
        else
        {
            // wrong answer - post it back
            _inputAnswers.Post(answer.Answer);
        }

    }
    else
    {
        // answer fail - return it
        break;
    }
}
于 2013-08-25T08:25:47.093 回答