1

我正在努力学习 ppl。而不是使用线程。让我们从头开始。我有这个简单的问题:

v1:

            while(true)
            {
                auto important_msg = ReceiveImportantMessage_Blocking(); //Blocks for about 20s until there is a new message
                //do processing on important_msg
                auto unimportant_msg = ReceiveUnimportantMessage_Blocking(); //Blocks for about 60s until there is a new message
                //do processing on unimportant_msg
            }

v1 显然很糟糕,因为这两个调用都会阻塞并最终相互等待。

v2:

            while(true)
            {
                auto important_msg = ReceiveImportantMessage_Blocking(); //Blocks for about 20s until there is a new message
                //do processing on important_msg
                auto unimportant_msg = CheckForUnimportantMessage_NonBlocking(); //works by polling queue. Returns empty if no message
                if(unimportant_msg) {
                    //do processing on unimportant_msg
                }
            }

v2 更好,因为不重要的消息不会阻塞重要的消息。此外,重要消息在收到后也会在实例中进行处理。尽管在我们收到重要消息之前不会检查不重要的消息。因此,当我收到这条不重要的信息时,它可能已经 20 多岁了。

v3:

            while(true)
            {
                auto important_msg = CheckForImportantMessage_NonBlocking(); //works by polling queue. Returns empty if no message
                if(important_msg) {
                    //do processing on important_msg
                }   
                auto unimportant_msg = CheckForUnimportantMessage_NonBlocking(); //works by polling queue. Returns empty if no message
                if(unimportant_msg) {
                    //do processing on unimportant_msg
                }
                sleep(10); //just so we don't busy wait.    
            }

v3 更快地获取不重要的消息。但它对于重要消息的速度也较慢。重要消息的处理不会在收到后立即进行。但只有当我四处检查时。由于我添加了睡眠以避免忙等待(并消耗过多的 cpu 时间),因此接收和处理重要消息的时间将比使用 v2.0 更长。

v4:

            {
                auto important_msg_task = ReceiveImportantMessageTask_NonBlocking(); //ppl::task
                auto unimportant_msg_task = ReceiveUnimportantMessageTask_NonBlocking(); //ppl::task
                while(true)
                {
                    if(important_msg_task.is_done()) {
                        auto important_msg = important_msg_task.get();
                        //do processing on important_msg
                        important_msg_task = ReceiveImportantMessageTask_NonBlocking(); //listen for new message
                    }
                    if(unimportant_msg_task.is_done()) {
                        auto unimportant_msg = unimportant_msg_task.get();
                        //do processing on important_msg
                        unimportant_msg_task = ReceiveUnimportantMessageTask_NonBlocking(); //listen for new message
                    }
                    sleep(10); //just so we don't busy wait.    
                }
            }

V4 与 v3 相同。只是用 ppl 任务代替。它的问题是一旦收到重要消息就不能立即处理。

v5)我想删除睡眠并使用important_msg_task“.then”在收到它后触发处理,“.then”在处理旧消息后侦听新消息,“.then”处理新消息等(对 unimportant_msg_task 执行相同的操作)。我不知道如何在循环中完成此操作。看来我最终会得到一个不断增长的连接任务链,一个接一个,永远。

那么你如何用 ppl 解决这个问题(或者至少没有原始​​线程)?

4

1 回答 1

3

使用这种惯用代码:

template<typename Func>
concurrency::task<void> doWhile(Func func)
{
  static_assert(
    std::is_same_v<decltype(func()), bool> ||
    std::is_same_v<decltype(func()), concurrency::task<bool>>);

  return concurrency::create_task(func)

    .then([func](bool needToContinue)
    {
      if (needToContinue)
        return doWhile(func);

      return concurrency::task_from_result();
    });
}

对于您的一项任务,您可以编写如下内容:

concurrency::task<bool> process() {
  // launching initial task
  auto important_msg_task = ReceiveImportantMessageTask_NonBlocking(); //ppl::task

  // adding continuation with processing of the result
  auto continuation = important_msg_task.then([](const Message &msg)
  {
    // do processing on important msg

    // decide whether to continue or stop processing
    return stopProcessing() ? false : true;
  });

  return continuation;
}

auto loop = doWhile([] {
  return process();
});

用于loop.get()等待处理完成。

要拥有两个并行处理“循环”,只需为另一个“循环”触发第二个doWhile,例如:

auto loop2 = doWhile([] {
  return ReceiveUnimportantMessageTask_NonBlocking()
    .then([](const Message &msg)
    {
      //do processing on unimportant msg
      return !stopProcessing();
    });
});

由于所有任务都在一个线程池上执行,因此有几个doWhiles 可以有效地使处理并行化。

于 2019-12-26T11:42:15.210 回答