3

我正在尝试学习如何在 C++11 中执行“令人尴尬”的并行任务。我遇到的一个常见模式是在对一系列值进行评估时获取函数的结果,类似于调用 python 的multiprocessing.Pool.map. 我写了一个最小的例子来展示我知道怎么做,即调用一个进程并等待结果。如何异步“映射”此调用并等待所有值完成?理想情况下,我希望得到与原始向量具有相同长度和顺序的向量。

#include <iostream>
#include <thread>
#include <future>
#include <vector>

using namespace std;

double square_add(double x, double y) { return x*x+y; }

int main() {
  vector<double> A = {1,2,3,4,5};

  // Single evaluation
  auto single_result = std::async(square_add,A[2],3);
  cout << "Evaluating a single index " << single_result.get() << endl;

  // Blocking map
  for(auto &x:A) {
    auto blocking_result = std::async(square_add,x,3);
    cout << "Evaluating a single index " << blocking_result.get() << endl;
  }

  // Non-blocking map?

  return 0;
}

注意:要使用此代码进行编译,gcc我需要该-pthreads标志。

4

2 回答 2

5

std::async 返回一个未来,因此您可以将它们存储在向量中以供以后使用:

std::vector<std::future<double>> future_doubles;
future_doubles.reserve(A.size());
for (auto& x : A) {
    // Might block, but also might not.
    future_doubles.push_back(std::async(square_add, x, 3));
}

// Now block on all of them one at a time.
for (auto& f_d : future_doubles) {
    std::cout << f_d.get() << std::endl;
}

现在上面的代码可能会也可能不会异步运行。由实现/系统决定是否值得异步执行任务。如果您想强制它在单独的线程中运行,您可以将可选的 launch_policy 传递给 std::async,将调用更改为

future_doubles.push_back(std::async(std::launch::async, square_add, x, 3));

有关std::async各种政策的更多信息,请参见此处

于 2013-10-01T20:45:14.077 回答
1

bstamour已经展示了规范/更简单的方法。但是,如果迭代速度很关键和/或结果类型必须是:std::vector<double>

std::vector<double> results(A.size());
std::vector<std::future<void>> futures;
    futures.reserve(A.size());                  // second allocation

auto store_result = [](double& result, double x, double y)
                    { result = square_add(x, y); };
// non-capturing lambda might give better performance, test it

// launch the async operations
for(int i = 0; i != A.size(); ++i)
    futures.emplace_back(std::async(store_result, std::ref(results[i]), A[i], 3));

// wait for all results to become ready
for(auto& e : futures)
    e.get();

// results are in the `results` vector
for(auto const& e : results)
    std::cout << e << std::endl;

完整示例:

#include <iostream>
#include <thread>
#include <future>
#include <vector>

using namespace std;

double square_add(double x, double y) { return x*x+y; }

int main() {
  vector<double> A = {1,2,3,4,5};

  // Single evaluation
  std::cout << "single evaluation" << std::endl;
  auto single_result = std::async(square_add,A[2],3);
  cout << "Evaluating a single index " << single_result.get() << endl;

  // Blocking map
  std::cout << "\n\n";
  std::cout << "blocking map" << std::endl;
  for(auto &x:A) {
    auto blocking_result = std::async(square_add,x,3);
    cout << "Evaluating a single index " << blocking_result.get() << endl;
  }

  // Non-blocking map?

    // bstamour's solution:
    std::cout << "\n\n";
    std::cout << "bstamour's solution" << std::endl;
    {
        std::vector<std::future<double>> future_doubles;
        future_doubles.reserve(A.size());
        for (auto& x : A) {
            // Might block, but also might not.
            future_doubles.push_back(std::async(square_add, x, 3));
        }

        // Now block on all of them one at a time.
        for (auto& f_d : future_doubles) {
            std::cout << f_d.get() << std::endl;
        }
    }

    // DyP's solution
    std::cout << "\n\n";
    std::cout << "DyP's solution" << std::endl;
    {
        std::vector<double> results(A.size());
        std::vector<std::future<void>> futures;
        futures.reserve(A.size());                  // second allocation
        auto store_result = [](double& result, double x, double y) { result = square_add(x, y); };
        for(int i = 0; i != A.size(); ++i)
            futures.emplace_back( std::async(store_result, std::ref(results[i]), A[i], 3) );

        for(auto& e : futures)
            e.get();

        for(auto const& e : results)
            std::cout << e << std::endl;
    }

  return 0;
}
于 2013-10-01T21:48:47.163 回答