3

我有一个基于消息泵线程池架构的应用程序。每当有可能阻塞的动作时,它都会被实现为“完成/触发 evnet 时的回调”动作,因此它不会停止正在执行的线程。

虽然这种技术适用于大多数情况,但在某些情况下它会变得非常不方便并且会使代码过于复杂。

我想做的是,在等待时以透明的方式继续处理事件,而不会将功能分解为等待前/等待后的部分。

我该怎么做?

我有两个选择:

  1. 在等待时从正在执行的函数中运行消息循环。
  2. 在等待时创建一个新的工作线程,并在恢复时(以适当的方式)终止它。

这两种选择都有其缺陷,仅举几例:

对于 1:

  • 可能会导致堆栈溢出。
  • 可能最终陷入僵局。
  • 如果内部消息导致等待第二个事件完成,而外部事件同时完成,则外部函数在第二个事件完成之前无法继续,这种情况可能会扩大。

选项 2 最终会导致创建越来越多的线程。

当然,可能还有其他我没有想到的选择。

编辑:语言是 C++,所以函数不能以简单(便携?)的方式进出。平台是 Windows (API),虽然我不认为它是相关的。

4

5 回答 5

2

对于可移植的 C++,这是不行的,但是既然你提到你的平台是 Windows,为什么不使用MsgWaitForMultipleObjects呢?它的目的是让你完全按照你的问题所说的去做——在等待的时候继续发送消息。

于 2009-06-22T11:09:29.840 回答
0

EDIT: You mention not wanting to "breaking the function up into pre/post waiting parts."

What language are you developing in? If it has continuations (yield return in C#) then that provides a way to write code that appears to be procedural but which can easily be paused until a blocking operation makes its completion callback.

Here's an article about the idea: http://msdn.microsoft.com/en-us/magazine/cc546608.aspx

UPDATE:

Unfortunatly, the language is C++

That would make a great T-shirt slogan.

Okay, so you might find it helpful to structure your sequential code as a state-machine, so it becomes interrupt/resume-capable.

e.g. your pain is needing to write two functions, the one that initiates and the one that acts as the handler for the completion event:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;
    begin_sending_string_somehow(msg, greeting_sent_okay);
}

void greeting_sent_okay()
{
    std::cout << "Greeting has been sent successfully." << std::endl;
}

Your idea was to wait:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;

    waiter w;
    begin_sending_string_somehow(msg, w);
    w.wait_for_completion();

    std::cout << "Greeting has been sent successfully." << std::endl;
}

In that example, waiter overloads operator() so it can serve as a callback, and wait_for_completion somehow hangs up until it sees that the operator() has been called.

I'm assuming that begin_sending_string_somehow's second parameter is a template parameter that can be any callable type accepting no parameters.

But as you say, this has drawbacks. Any time a thread is waiting like that, you've added another potential deadlock, and you are also consuming the "resource" of a whole thread and its stack, meaning that more threads will have to be created elsewhere to allow work to be done, which is contradictory to the whole point of a thread pool.

So instead, write a class:

class send_greeting
{
    int state_;
    std::string msg_;

public:
    send_greeting(const std::string &msg)
        : state_(0), msg_(msg) {}

    void operator()
    {
        switch (state_++)
        {
            case 0:
                std::cout << "Sending the greeting" << std::endl;
                begin_sending_string_somehow(msg, *this);
                break;

            case 1:
                std::cout << "Greeting has been sent successfully." 
                          << std::endl;
                break;
        }
    }
};

The class implements the function call operator (). Each time it is called, it executes the next step in the logic. (Of course, being such a trivial example, this now is mostly state management noise, but in a more complex example with four or five states it may help clarify the sequential nature of the code).

Problems:

  • If the event callback function signature has special parameters, you'll need to add another overload of operator() that stores the parameters in extra fields and then calls onto the parameterless overload. Then it starts to get messy because those fields will be accessible at compile-time in the initial state, even though they are not meaningful at runtime in that state.

  • How do objects of the class get constructed and deleted? The object has to survive until the operation completes or is abandoned... the central pitfall of C++. I'd recommend implementing a general scheme to manage it. Create a list of "things that will need to be deleted" and ensure that this happens automatically at certain safe points, i.e. try to get as close as possible to GC as you can. The further away you are from that, the more memory you will leak.

于 2009-06-20T23:47:48.033 回答
0

您的问题是同步线程对吗?如果这是您的问题,为什么不使用互斥锁?它可以用一个接口包裹起来。事实上,您可以使用 PIMPL 习惯用法来使互斥锁可移植。

http://msdn.microsoft.com/en-us/library/system.threading.mutex(VS.71).aspx

于 2009-06-23T17:01:21.690 回答
0

看来您的问题是根本性的,与 C++ 无关。其他语言在隐藏堆栈使用方面可能更好,但只要你没有从 Foo() 返回,你就需要 Foo() 的调用堆栈。如果你也在执行 Bar(),那也需要一个调用栈。

线程是一种很好的方法,因为每个线程都有自己的调用堆栈。延续是保存调用堆栈的一种聪明但复杂的方法,因此在可用的情况下,它们也是一种选择。但是,如果您不想要这些,则必须使用一个调用堆栈。

使用一个调用堆栈需要解决重入问题。在这里,没有关于什么是可能的通用答案。通常,您将拥有一组由函数 F1...Fy 处理的消息 M1..Mx,并带有一些特定于应用程序和可能依赖于状态的映射。使用可重入消息循环,当您收到 Mj 时,您可能正在执行 Fi。现在的问题是该怎么做。并非所有函数 F1...Fn 都可以调用;特别是 Fi 本身可能无法调用。然而,一些其他功能也可能不可用,例如因为它们共享资源。这取决于应用程序。

如果 Mj 的处理需要任何这些不可用的功能,您必须推迟它。你能接受队列中的下一条消息吗?同样,这取决于实现,它甚至可能与消息类型和内容有关。如果消息足够独立,则可以乱序执行它们。这很快变得相当复杂 - 要确定是否可以接受队列中的第 N 条消息,您必须检查它是否可以相对于前面的 N-1 条消息乱序执行。

一种语言可以通过不隐藏依赖关系来帮助您,但最终您必须做出明确的决定。没有灵丹妙药。

于 2009-06-22T10:51:32.077 回答
0

在不了解您的特定应用程序的更多信息(即处理消息需要多长时间等)的情况下,将会有很多挥手:

  • 这是托管还是非托管 C++?

  • 您使用的是哪个线程池?

    • 队列用户工作项?
    • 您自己的池通过 CreateIoCompletionPort?
    • 还是 Vista 的 SubmitThreadpoolWork?

我认为平台有点相关,因为线程池的性质很重要。

例如:

如果您将(完成端口)用于您的线程池(即 CreateIoCompletionPort)。您可以控制并发运行的线程数(以及最终创建的总线程数)。如果您将最大并发线程数设置为 4。Windows 将尝试只允许 4 个线程同时运行。如果所有 4 个线程都忙于处理并且您将第 5 个项目排队,那么 Windows 将不允许该项目运行,直到 4 个线程之一完成(重用线程)。只有当线程被阻塞(即等待 I/O)时,这条规则被打破,然后更多的线程被允许运行。

这是了解完成端口的重要事项,以及为什么平台是相关的。如果不涉及内核,很难实现这样的事情。了解繁忙线程和阻塞线程之间的区别需要访问线程状态。完成端口对于进入内核的上下文切换数量也非常有效。

回到你的问题:

似乎您应该有一个线程来处理/发送消息,并且消息处理都是通过将工作人员推入线程池来处理的。让完成端口处理负载平衡和并发。您的消息处理循环永远不会阻塞并且可以继续处理消息。

如果传入消息的速率远远超出您处理它们的能力,那么您可能必须注意队列大小并在它变得太大时阻塞。

于 2009-06-21T15:16:09.563 回答