因此,我正在尝试创建一种通用方式来创建期货容器,并以非阻塞方式执行所有 future.get()'。
我预计任务的完成时间应该在几百毫秒之间,通常最多 2 分钟。然而,有些可能根本无法完成。在典型的运行中将至少执行 10,000 个任务。
我希望返回最快的任务结果,而不会被期货容器中其他更长时间运行的任务所阻碍。
到目前为止,这是我使用虚拟睡眠时间来模拟任务完成延迟的内容(设计在很大程度上要归功于这里发表的好帖子,例如this和this):
#include <future>
#include <vector>
#include <iostream>
#include <random>
#include <chrono>
#include <ratio>
#include <thread>
#include <algorithm>
size_t rand_from_range(const size_t, const size_t);
int rand_sleep_range(const size_t, const size_t);
template<class CT> size_t get_async_all( CT& );
// Given a function and a collection,
// return a vector of futures.
template<class Function, class CT>
auto async_all( Function f, CT coll )
-> std::vector<decltype(std::async(f, *std::begin(coll)))>
{
std::vector<decltype(std::async(f, *std::begin(coll)))> futures;
futures.reserve(coll.size());
for (auto& element : coll)
futures.push_back(std::async(f, element));
return futures;
}
// Given the beginning and end of a number
// range, return a random number therein.
size_t rand_from_range( const size_t range_begin,
const size_t range_end )
{
std::uniform_int_distribution<size_t>
distr(range_begin, range_end);
std::random_device dev;
return distr(dev);
}
// Given a shortest and longest duration, put the calling
// thread to sleep for a random duration therein.
// (in milliseconds)
int rand_sleep_range( const size_t shortest_time,
const size_t longest_time )
{
std::chrono::milliseconds
sleep_time(rand_from_range(shortest_time, longest_time));
std::this_thread::sleep_for(sleep_time);
return (int)sleep_time.count();
}
// Given a container of futures, perform all
// get()'s.
template<class CT>
size_t get_async_all( CT& async_coll )
{
size_t get_ctr(0);
const size_t future_cnt = async_coll.size();
std::vector<size_t> completed;
completed.reserve(future_cnt);
while (true) {
for (size_t ndx = 0; ndx < future_cnt; ++ndx) {
// Check to see if this ndx' future has completed already.
if (std::none_of(std::begin(completed), std::end(completed),
[=](size_t x) {
return (x == ndx);
}))
{ // No, this one hasn't completed
// yet, attempt to process it.
auto& f = async_coll[ndx];
if (f.wait_for(std::chrono::milliseconds(10))
== std::future_status::ready)
{
f.get(); // The future's work gets done here.
++get_ctr;
completed.push_back(ndx);
if (completed.size() == future_cnt)
break; // for()
}
}
}
if (completed.size() == future_cnt)
break; // while()
}
return get_ctr;
}
int main()
{
// A dummy container of ints.
std::vector<int> my_vec(100);
for (auto& elem : my_vec)
elem = rand_from_range(1, 100);
// A dummy function lambda.
auto my_func = [](int x) {
int x_ = x;
int sleep_time = rand_sleep_range(100, 20000); // in ms.
x *= 2;
std::cout << " after sleeping " << sleep_time << "ms \t"
<< "f(" << x_ << ") = " << x << std::endl;
};
// Create and execute the container of futures.
auto async_coll = async_all(my_func, my_vec);
size_t count = get_async_all(async_coll);
std::cout << std::endl << count << " items completed. \n";
}
所以,我的问题是:
- 我使用的方法有什么问题吗?
- get_async_all() 是否有比我正在使用的更好/更优雅的方法?或者我正在做的任何其他事情,就此而言。
感谢任何人花时间查看代码,并给我任何建设性的批评或反馈。