2

我正在使用std::asyncstd::futurefrom的组合C++ 11。我正在对我在代码中执行的某个活动强制执行超时,这在我尝试连接到服务器时可能需要一些时间。

以下是代码的样子:

#include <future>
#include <chrono>

std::size_t PotentiallyLongRunningActivity() {
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(10000s);
    return 10;
}

bool DoActivity() {

  bool activity_done = false;
  auto my_future_result(std::async(std::launch::async, []() {
      return PotentiallyLongRunningActivity(); //returns size_t
  }));

  std::future_status my_future_status = my_future_result.wait_for(std::chrono::milliseconds(800));
  if (my_future_status == std::future_status::timeout) {
      activity_done = false;
  }
  else if (my_future_status == std::future_status::ready) {
      if (my_future_result.valid() && my_future_result.get() > 0) {
          activity_done = true;
      }
  }

  return activity_done;
  //my_future_result hangs while exiting this method !!!
}

int main(int argc, char *argv[])
{
    DoActivity();
    return 0;
}

在大多数情况下,事情都很好。未来超时并在许多情况下报告已准备就绪。但是,我观察到的奇怪行为是,在某些情况下,UI 会挂起,因为my_future_result超出范围时会挂起。我通过重复调用来确认这一点,my_future_result.get()如果在退出方法之前调用它就不会返回。

我怎样才能解决这个问题?有什么方法可以取消或删除或终止std::future?

4

4 回答 4

6

您将在std::async任务完成之前退出您的功能。在某些情况下,a 的析构函数std::future会阻塞,直到任务完成。

http://en.cppreference.com/w/cpp/thread/future/wait_for

在此示例的文档中wait_for显示了对超时后的多次调用wait_for,表明超时的行为不会取消std::async任务。

没有允许在外部杀死线程的内置支持(我可以发现)。这是有道理的,因为如果线程以这种方式终止,则无法正确清理线程正在使用的系统资源的状态。

相反,最好将超时逻辑放在线程本身中,以便它可以自行终止并正确清理。

于 2017-02-16T17:34:47.877 回答
2

作为一般规则,失去对线程的跟踪是非常糟糕的。退出时让代码在另一个线程中运行main是未定义行为的原因。

因此std::future,返回的具有特殊属性,当它被销毁时它将std::async等待完成。std::async

这就是您所说的“挂起”。这不是挂起 - 它正在等待任务完成。

C++11 中的线程原语是原语;它们不是功能齐全的应用程序的完成类型。你可以用它们写线程池延迟任务队列等;天真地“原始地”使用它们往往会导致它们偏向于正确性,但不会给你想要的东西。

一个简单的线程池就是:

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};
struct thread_pool {
  thread_pool( std::size_t n = 1 ) { start_thread(n); }
  thread_pool( thread_pool&& ) = delete;
  thread_pool& operator=( thread_pool&& ) = delete;
  ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R()> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> run_task( F task ) {
    if (threads_active() >= total_threads()) {
      start_thread();
    }
    return queue_task( std::move(task) );
  }
  void terminate() {
    tasks.terminate();
  }
  std::size_t threads_active() const {
    return active;
  }
  std::size_t total_threads() const {
    return threads.size();
  }
  void clear_threads() {
    terminate();
    threads.clear();
  }
  void start_thread( std::size_t n = 1 ) {
    while(n-->0) {
      threads.push_back(
        std::async( std::launch::async,
          [this]{
            while(auto task = tasks.pop_front()) {
              ++active;
              try{
                (*task)();
              } catch(...) {
                --active;
                throw;
              }
              --active;
            }
          }
        )
      );
    }
  }
private:
  std::vector<std::future<void>> threads;
  threaded_queue<std::packaged_task<void()>> tasks;
  std::atomic<std::size_t> active;
};

活生生的例子

现在你在某个地方创建了一些线程池,向它扔任务,你可以等待有问题的期货。池中有有限数量的线程。

run_task将确保有一个线程来运行您排队的任何任务。 queue_task如果可用,将仅使用现有线程。

返回std::future<void>的不会阻塞任务完成;但是thread_pool对象的析构函数可以。

请注意,它将中止所有排队的任务,并等待当前正在运行的任务完成,默认情况下在销毁时。

包裹 aunique_ptr<thread_pool>的东西有助于轻松移动。必须禁用移动,因为活动线程保持指向-的指针this

thread_pool具有讽刺意味的是,它不是线程安全的;这是因为我们不保护std::vector<std::future<void>> threads;; 我的意思是,除了线程本身存储的线程安全之外。它被设计为仅由一个外部线程直接访问。

queue_task并且terminate是线程安全的,主要是偶然的。

于 2017-02-16T19:09:56.510 回答
2

取自cppreference sample,只有“开始”、“f2 完成”和“结束”会从此代码中打印出来(因为f1不会“挂起”):

#include <future>
#include <thread>
#include <iostream>

int main() {
    using namespace std::literals;

    {
        std::packaged_task<int()> task([]() {
            std::this_thread::sleep_for(5s);
            std::cout << "f1 finished" << std::endl;
            return 42;
        });
        std::future<int> f1 = task.get_future();
        std::thread(std::move(task)).detach();

        std::future<int> f2 = std::async(std::launch::async, []() {
            std::this_thread::sleep_for(3s);
            std::cout << "f2 finished" << std::endl;
            return 42;
        });

        f1.wait_for(1s);
        f2.wait_for(1s);
        std::cout << "the start" << std::endl;
    }

    // std::this_thread::sleep_for(7s);
    std::cout << "the end" << std::endl;
}

有关好的讨论,请参阅:http ://scottmeyers.blogspot.com.br/2013/03/stdfutures-from-stdasync-arent-special.html 。

C++ 标准库不支持线程终止操作。

小心你的线程detach。分离本身并不是“非常糟糕”,例如,它可能在用户可终止的守护进程中有用,或者如果您对编排和拆卸有其他想法。否则,detach标准库没有提供任何意义。

于 2017-02-16T18:09:16.307 回答
1

错误的原因是因为编译器没有被告知您的函数 DoModify() 的结果将是异步可用的(因此被声明为 std::future<> )并且它期望 bool 类型的同步结果没有'不是这样的。您可以使用 std::future::is_ready() 或 std::future_status 。这是一个示例代码片段

 std::future<size_t> DoActivity()
 {
      return std::async(std::launch::async, []()
            {
             return PotentiallyLongRunningActivity(); 
            });
 }

 int main()
 {
     auto result = DoActivity();
     if ( result. Is_ready() )
     {
         auto data = result.get();
         //do something with data
     }
 }
于 2017-02-17T12:45:00.657 回答