4

因此,我正在尝试创建一种通用方式来创建期货容器,并以非阻塞方式执行所有 future.get()'。

我预计任务的完成时间应该在几百毫秒之间,通常最多 2 分钟。然而,有些可能根本无法完成。在典型的运行中将至少执行 10,000 个任务。

我希望返回最快的任务结果,而不会被期货容器中其他更长时间运行的任务所阻碍。

到目前为止,这是我使用虚拟睡眠时间来模拟任务完成延迟的内容(设计在很大程度上要归功于这里发表的好帖子,例如thisthis):

#include <future>
#include <vector>
#include <iostream>
#include <random>
#include <chrono>
#include <ratio>
#include <thread>
#include <algorithm>

size_t rand_from_range(const size_t, const size_t);
int rand_sleep_range(const size_t, const size_t);
template<class CT> size_t get_async_all( CT& );

// Given a function and a collection,
//  return a vector of futures.
template<class Function, class CT>
auto async_all( Function f, CT coll )
    -> std::vector<decltype(std::async(f, *std::begin(coll)))>
{
  std::vector<decltype(std::async(f, *std::begin(coll)))> futures;
  futures.reserve(coll.size());
  for (auto& element : coll)
    futures.push_back(std::async(f, element));
  return futures;
}

// Given the beginning and end of a number
//  range, return a random number therein.
size_t rand_from_range( const size_t range_begin, 
                        const size_t range_end )
{
  std::uniform_int_distribution<size_t> 
    distr(range_begin, range_end);
  std::random_device dev;
  return distr(dev);
} 

// Given a shortest and longest duration, put the calling
//  thread to sleep for a random duration therein. 
// (in milliseconds)
int rand_sleep_range( const size_t shortest_time, 
                      const size_t longest_time )
{
  std::chrono::milliseconds 
    sleep_time(rand_from_range(shortest_time, longest_time));
  std::this_thread::sleep_for(sleep_time);
  return (int)sleep_time.count();
} 

// Given a container of futures, perform all
//  get()'s.
template<class CT>
size_t get_async_all( CT& async_coll )
{
  size_t get_ctr(0);
  const size_t future_cnt = async_coll.size();
  std::vector<size_t> completed;
  completed.reserve(future_cnt);

  while (true) {
    for (size_t ndx = 0; ndx < future_cnt; ++ndx) {
      // Check to see if this ndx' future has completed already.
      if (std::none_of(std::begin(completed), std::end(completed), 
            [=](size_t x) {
              return (x == ndx);
            }))
      { // No, this one hasn't completed 
        //  yet, attempt to process it.
        auto& f = async_coll[ndx];
        if (f.wait_for(std::chrono::milliseconds(10)) 
              == std::future_status::ready) 
        {
          f.get(); // The future's work gets done here.
          ++get_ctr;
          completed.push_back(ndx);
          if (completed.size() == future_cnt) 
            break; // for()
        }
      }
    }
    if (completed.size() == future_cnt) 
      break; // while()
  }
  return get_ctr;
}

int main()
{
  // A dummy container of ints.
  std::vector<int> my_vec(100);
  for (auto& elem : my_vec)
    elem = rand_from_range(1, 100);

  // A dummy function lambda.
  auto my_func = [](int x) { 
    int x_ = x;
    int sleep_time = rand_sleep_range(100, 20000); // in ms.
    x *= 2;
    std::cout << " after sleeping " << sleep_time << "ms \t"
              << "f(" << x_ << ") = " << x << std::endl;
  };

  // Create and execute the container of futures.
  auto async_coll = async_all(my_func, my_vec);
  size_t count = get_async_all(async_coll);

  std::cout << std::endl << count << " items completed. \n";
}

所以,我的问题是:

  • 我使用的方法有什么问题吗?
  • get_async_all() 是否有比我正在使用的更好/更优雅的方法?或者我正在做的任何其他事情,就此而言。

感谢任何人花时间查看代码,并给我任何建设性的批评或反馈。

4

1 回答 1

3

至少有一个陷阱。您std::async在未指定启动策略的情况下调用,这意味着部分或全部任务可能会延迟运行。但是在您的测试中查看任务是否已完成,您只测试std::future_status_ready. 如果一个任务被推迟,你总是会回来std::future_status_deferred,这意味着你的测试永远不会返回 true。

这个问题的最简单的解决方案是指定一个启动策略std::launch::async,但是你会冒着超额订阅系统的风险。另一种方法是修改您的测试以检查延迟任务,但问题是如何处理它们。如果你打电话给他们getwait你会阻塞任意时间。

关于您的一般方法,与其在轮询它们时阻塞 10 毫秒以等待每个任务完成,不如考虑等待 0 毫秒,即进行纯轮询以查看任务是否完成。这可能会减少任务完成和处理它之间的延迟,但它可能会增加监听到的轮询,从而使整个系统运行速度变慢。

一种完全不同的方法可能是放弃轮询每个任务,而是让每个任务将“我完成”标志写入共享数据结构(例如 a std::deque),然后定期轮询该数据结构以查看其中是否有任何内容. 如果是这样,则处理已完成的任务,将它们从数据结构中删除,然后重新进入睡眠状态,直到再次轮询。如果您的任务push_back在数据结构上执行,您自然可以按照它们完成的顺序处理它们。这种设计的一个缺点是共享数据结构可能成为性能瓶颈。

于 2013-02-12T15:19:30.320 回答