0

我正在寻找一种组合异步操作的方法。最终目标是执行异步操作,或者让它运行完成,或者在用户定义的超时后返回。

出于示例目的,假设我正在寻找一种方法来组合以下协程1

IAsyncOperation<IBuffer> read(IBuffer buffer, uint32_t count)
{
    auto&& result{ co_await socket_.InputStream().ReadAsync(buffer, count, InputStreamOptions::None) };
    co_return result;
}

socket_作为一个实例StreamSocket

和超时协程:

IAsyncAction timeout()
{
    co_await 5s;
}

我正在寻找一种方法以某种方式组合这些协程,一旦读取数据或超时已过期,就会尽快返回。

这些是我迄今为止评估过的选项:

  • C++20 协程:据我了解P1056R0,目前没有库或语言功能“启用协程的创建和组合”
  • Windows 运行时提供了异步任务类型,最终派生自IAsyncInfo:同样,我没有找到任何可以让我按照我需要的方式组合任务的工具。
  • 并发运行时:这看起来很有希望,特别是when_any函数模板看起来正是我所需要的。

从那看来我需要使用并发运行时。但是,我很难将所有部分组合在一起。我对如何处理异常以及是否需要取消相应的其他并发任务感到特别困惑。

问题有两个方面:

  • 并发运行时是唯一的选项(UWP 应用程序)吗?
  • 实现会是什么样子?

1 这些方法在应用程序内部。不需要让它们返回与 Windows 运行时兼容的类型。

4

2 回答 2

1

我认为最简单的方法是使用该concurrency库。您需要修改超时以返回与第一种方法相同的类型,即使它返回 null。

(我意识到这只是部分答案......)

我的 C++ 很烂,但我认为这很接近......

array<task<IBuffer>, 2> tasks =
{
concurrency::create_task([]{return read(buffer, count).get();}),
concurrency::create_task([]{return modifiedTimeout.get();})
};

concurrency::when_any(begin(tasks), end(tasks)).then([](IBuffer buffer)
{ 
    //do something 
});

于 2019-04-04T04:02:25.033 回答
1

正如 Lee McPherson 在另一个答案中所建议的那样,并发运行时看起来是一个可行的选择。它提供了可以与其他任务组合、使用延续链接起来的任务,以及与 Windows 运行时异步模型无缝集成(请参阅在 C++ 中为 UWP 应用创建异步操作)。作为奖励,包括<pplawait.h>头文件为类模板实例化提供了适配器,concurrency::task以用作 C++20 协程等待对象。

我无法回答所有问题,但这是我最终想出的。为简单起见(和易于验证),我使用Sleep代替实际的读取操作,并返回 anint而不是IBuffer.

任务的组成

ConcRT 提供了几种组合任务的方法。给定的需求concurrency::when_any可用于创建在任何提供的任务完成时返回的任务。当仅提供 2 个任务作为输入时,还有一个便利运算符 ( operator||) 可用。

异常传播

任何一个输入任务引发的异常都不算成功完成。与when_any任务一起使用时,抛出异常不足以满足等待条件。因此,异常不能用于中断组合任务。为了解决这个问题,我选择返回 a std::optional,并继续引发适当的异常then

任务取消

这对我来说仍然是个谜。看来,一旦一个任务满足该任务的等待条件when_any,就不需要取消各自的其他未完成任务。一旦完成(成功或以其他方式),它们就会被默默地处理。

以下是代码,使用前面提到的简化。它创建了一个由实际工作负载和一个超时任务组成的任务,两者都返回一个std::optional. 继续检查返回值,并在没有返回值的then情况下抛出异常(即timeout_task先完成)。

#include <Windows.h>

#include <cstdint>
#include <iostream>
#include <optional>
#include <ppltasks.h>
#include <stdexcept>

using namespace concurrency;

task<int> read_with_timeout(uint32_t read_duration, uint32_t timeout)
{
    auto&& read_task
    {
        create_task([read_duration]
            {
                ::Sleep(read_duration);
                return std::optional<int>{42};
            })
    };
    auto&& timeout_task
    {
        create_task([timeout]
            {
                ::Sleep(timeout);
                return std::optional<int>{};
            })
    };

    auto&& task
    {
        (read_task || timeout_task)
        .then([](std::optional<int> result)
            {
                if (!result.has_value())
                {
                    throw std::runtime_error("timeout");
                }
                return result.value();
            })
    };
    return task;
}

以下测试代码

int main()
{
    try
    {
        auto res1{ read_with_timeout(3000, 5000).get() };
        std::cout << "Succeeded. Result = " << res1 << std::endl;
        auto res2{ read_with_timeout(5000, 3000).get() };
        std::cout << "Succeeded. Result = " << res2 << std::endl;
    }
    catch( std::runtime_error const& e )
    {
        std::cout << "Failed. Exception = " << e.what() << std::endl;
    }
}

产生这个输出:

Succeeded. Result = 42
Failed. Exception = timeout
于 2019-04-16T17:58:49.110 回答