2

最近我开始实现一个使用“观察者模式”的消息调度系统:这里没什么特别的。当我开发它时,我认为从“主题”发送“消息”对象会很好,这些对象可能彼此根本不同,并且可以从许多“观察者”那里读取。

这些不同的消息采取不同消息类别的形式(例如,想想“用户注销消息”、“屏幕模式切换”和“音量级别已更改”,所有这些都需要不同的信息),很快我发现“观察者" 不需要知道我想要创建的每条不同的信息(至少可以说,这将是……不可持续的)。相反,我希望每个观察者都能够对特定类型的消息做出反应。

因此,为了制作一些东西,我认为双重调度可能是我的选择。经过一点点我得到了这篇文章(c ++ 11只是因为for循环):

#include <iostream>
#include <vector>
#include <string>

/**
* A few forward declarations.
*/

class Message_base;
class Message_type_a;
class Message_type_b;

/**
* Base observer...
*/

class Observer_base
{
    public:

    /**
    * All these implementations are empty so we don't have to specify them
    * in every single derived class.
    */

    virtual void        get_message(const Message_base&) {}
    virtual void        get_message(const Message_type_a&) {}
    virtual void        get_message(const Message_type_b&) {}
};

/**
* Specification of all message types.
*/

class Message_base
{
    public:

    /**
    * This is the method that will implement the double dispatching in all
    * derived classes.
    */

    virtual void        be_recieved(Observer_base &r) const=0;  //Now that's a nasty method name.
};

class Message_type_a:public Message_base
{
    private:
    int integer_value;

    public:
                Message_type_a(int v):integer_value(v) {}
    int             get_integer_value() const {return integer_value;}
    void            be_recieved(Observer_base &r) const {r.get_message(*this);}

};

class Message_type_b:public Message_base
{
    private:
    std::string string_value;

    public:
                Message_type_b(const std::string v):string_value(v) {}
    std::string         get_string_value() const {return string_value;}
    void            be_recieved(Observer_base &r) const {r.get_message(*this);}
};

/**
* This is the base clase for the Subject... Notice that there are no virtual
* methods so we could as well instantiate this class instead of derive it.
*/

class Subject_base
{
    private:

    std::vector<Observer_base *>    observers;

    public:

    void            emit_message(const Message_base& m) {for(auto o : observers) m.be_recieved(*o);}    //Again, nasty to read since it's... backwards.
    void            register_observer(Observer_base * o) {observers.push_back(o);} 
};

/**
* Now we will create a subject class for the sake of it. We could just call the
* public "emit_message" from main passing Message objects.
*/

class Subject_derived:public Subject_base
{
    public:

    void            emit_message_a(int v) {emit_message(Message_type_a(v));}
    void            emit_message_b(const std::string v) {emit_message(Message_type_b(v));}
};

/**
* This gets fun... We make two observers. One will only get type_a messages
* and the other will get type_b.
*/

class Observer_type_a:public Observer_base
{
    private:

    int             index;  //We will use it to identify the observer.

    public:

                Observer_type_a(int i):index(i) {}
    void            get_message(const Message_type_a& m) {std::cout<<"Observer_type_a ["<<index<<"] : got type_a message : "<<m.get_integer_value()<<std::endl;}
};

class Observer_type_b:public Observer_base
{
    private:

    std::string         name; //Merely to identify the observer.

    public:

                Observer_type_b(const std::string& n):name(n) {}
    void            get_message(const Message_type_b& m) {std::cout<<"Observer_type_b ["<<name<<"] : got type_b message : "<<m.get_string_value()<<std::endl;}
};

/**
* Stitch all pieces together.
*/

int main(int argc, char ** argv)
{
    Observer_type_a o_a1(1);
    Observer_type_a o_a2(2);
    Observer_type_b o_b1("Sauron");
    Observer_type_b o_b2("Roverandom");

    Subject_derived s_a;

    s_a.register_observer(&o_a1);
    s_a.register_observer(&o_b1);

    s_a.emit_message_a(23);
    s_a.emit_message_b("this is my content");

    s_a.register_observer(&o_a2);
    s_a.register_observer(&o_b2);

    s_a.emit_message_a(99);
    s_a.emit_message_b("this is my second content");

    //gloriously exit.  
    return 0;
}

为了清楚起见,我将在这里说出我的目标:

  • 能够从主题发送许多不同的消息。
  • 让观察者专门化他们忽略不适合他们的每条消息(如果根本不发送它们会更好,但我知道我可以注册不同的观察者组)。
  • 避免 RTTI 和派生类转换。

我的问题来了:我是否错过了一个更简单的实现来实现我的目标?

值得一提的是,将使用此系统的系统不会有那么多观察者,可能同时出现的对象可能少于十个。

4

2 回答 2

4

一些元编程样板:

// a bundle of types:
template<class...>struct types{using type=types;};

// a type that does nothing but carry a type around
// without being that type:
template<class T>struct tag{using type=T;};

// a template that undoes the `tag` operation above:
template<class Tag>using type_t=typename Tag::type;

// a shorter way to say `std::integral_constant<size_t, x>`:
template<std::size_t i>struct index:std::integral_constant<std::size_t, i>{};

获取 a 中类型的索引types<...>

// this code takes a type T, and a types<...> and returns
// the index of the type in there.
// index_of
namespace details {
  template<class T, class Types>
  struct index_of{};
}
template<class T, class Types>
using index_of_t=type_t<details::index_of<T,Types>>;
namespace details {
  // if the first entry in the list of types is T,
  // our value is 0
  template<class T, class...Ts>struct index_of<T, types<T,Ts...>>:
    tag< index<0> >
  {};
  // otherwise, it is 1 plus our value on the tail of the list:
  template<class T, class T0, class...Ts>
  struct index_of<T, types<T0, Ts...>>:
    tag< index< index_of_t<T,types<Ts...>{}+1 > >
  {};
}

这是一个单一的“频道”广播器(它发送一种消息):

// a token is a shared pointer to anything
// below, it tends to be a shared pointer to a std::function
// all we care about is the lifetime, however:
using token = std::shared_ptr<void>;
template<class M>
struct broadcaster {
  // f is the type of something that can eat our message:
  using f = std::function< void(M) >;
  // we keep a vector of weak pointers to people who can eat
  // our message.  This lets them manage lifetime independently:
  std::vector<std::weak_ptr<f>> listeners;

  // reg is register.  You pass in a function to eat the message
  // it returns a token.  So long as the token, or a copy of it,
  // survives, broadcaster will continue to send stuff at the
  // function you pass in:
  token reg( f target ) {
    // if thread safe, (write)lock here
    auto sp = std::make_shared<f>(std::move(target));
    listeners.push_back( sp );
    return sp;
    // unlock here
  }
  // removes dead listeners:
  void trim() {
    // if thread safe, (try write)lock here
    // and/or have trim take a lock as an argument
    listeners.erase(
      std::remove_if( begin(listeners), end(listeners), [](auto&& p){
        return p.expired();
      } ),
      listeners.end()
    );
    // unlock here
  }
  // Sends a message M m to every listener who is not dead:
  void send( M m ) {
    trim(); // remove dead listeners
    // (read) lock here
    auto tmp_copy = listeners; // copy the listeners, just in case
    // unlock here

    for (auto w:tmp_copy) {
      auto p = w.lock();
      if (p) (*p)(m);
    }
  }
};

这是一个多通道subject,可以支持任意数量的不同消息类型(在编译时确定)。如果您无法匹配消息类型,send和/或reg将无法编译。您有责任决定消息是 aconst&还是 value 或其他什么。尝试reg右值消息是行不通的。它的目的是明确地传递M给。regsend

// fancy wrapper around a tuple of broadcasters:
template<class...Ts>
struct subject {
  std::tuple<broadcaster<Ts>...> stations;
  // helper function that gets a broadcaster compatible
  // with a message type M:
  template<class M>
  broadcaster<M>& station() {
    return std::get< index_of_t<M, types<Ts...>>{} >( stations );
  }
  // register a message of type M.  You should call with M explicit usually:
  template<class M>
  token reg( std::function<void(M)> listener ) {
    return station<M>().reg(std::move(listener));
  }
  // send a message of type M.  You should explicitly pass M usually:
  template<class M>
  void send( M m ) {
    station<M>().send(std::forward<M>(m));
  }
};

活生生的例子

当你reg,它返回一个token,又名std::shared_ptr<void>。只要这个令牌(或副本)存在,消息就会流动。如果它消失,则发送到 reged 回调的消息将结束。通常,这意味着侦听器应该维护一个std::vector<token>, 和 reg 的 lambdas,它们会随意使用this

在 C++14/1z 中,上面的内容变得更好了(我们可以去掉types<...>and index_of)。

如果在广播周期中添加监听器,它不会被发送到。如果您在广播周期中删除了一个侦听器,它将不会被发送到您删除它的点之后。

线程安全注释是为广播器上的读/写锁设置的。

为给定广播者的死听众分配的内存在trim或被send调用时被回收。但是,std::function意志早就被破坏了,所以只有有限的内存被浪费了,直到下一次send。然后我就这样做了,因为无论如何我们都要遍历消息列表,所以还不如先清理任何乱七八糟的东西。

该解决方案没有 RTTI 或动态转换,消息仅发送给理解它们的听众。


中,事情变得更简单了。删除所有元编程样板,删除subject(保留broadcaster)并执行此操作以处理多个通道:

template<class...Ms>
struct broadcasters : broadcaster<Ms>... {
  using broadcaster<Ms>::reg...;
  using broadcaster<Ms>::send...;

  template<class M>
  broadcaster<M>& station() { return *this; }
};

现在这broadcasters几乎是对subject上面的改进。

由于std::functionreg以来的改进,除非信号选项过于相似,否则该函数通常会做正确的事情。如果你确实遇到了regor的问题send,你就不得不打电话了.station<type>().reg(blah)

但是 99/100 次你可以只做 a.reg( lambda )并且.send( msg )重载决议做正确的事。

活生生的例子

这是整个系统增加了一个模块化的插入式线程安全系统:

struct not_thread_safe {
    struct not_lock {~not_lock(){}};
    auto lock() const { return not_lock{}; }
};
struct mutex_thread_safe {
    auto lock() const { return std::unique_lock<std::mutex>(m); }
private:
    mutable std::mutex m;
};
struct rw_thread_safe {
    auto lock() { return std::unique_lock<std::shared_timed_mutex>(m); }
    auto lock() const { return std::shared_lock<std::shared_timed_mutex>(m); }
private:
    mutable std::shared_timed_mutex m;
};
template<class D, class>
struct derived_ts {
    auto lock() { return static_cast<D*>(this)->lock(); }
    auto lock() const { return static_cast<D const*>(this)->lock(); }
};
using token = std::shared_ptr<void>;
template<class M, class TS=not_thread_safe>

struct broadcaster:
  TS
{
  using f = std::function< void(M) >;
  mutable std::vector<std::weak_ptr<f>> listeners;
  token reg( f target )
  {
    auto l = this->lock();
    auto sp = std::make_shared<f>(std::move(target));
    listeners.push_back( sp );
    return sp;
  }
  // logically const, but not really:
  void trim() const {
    auto l = const_cast<broadcaster&>(*this).lock();
    auto it = std::remove_if( listeners.begin(), listeners.end(), [](auto&& p){
      return p.expired();
    } );
    listeners.erase( it, listeners.end() );
  }
  // logically const, but not really:
  void send( M m ) const
  {
    trim(); // remove dead listeners
    auto tmp_copy = [this]{
      auto l = this->lock();
      return listeners; // copy the listeners, just in case
    }();

    for (auto w:tmp_copy) {
      auto p = w.lock();
      if (p) (*p)(m);
    }
  }
};
template<class TS, class...Ms>
struct basic_broadcasters :
    TS,
    broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >... 
{
  using TS::lock;
  using broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >::reg...;
  using broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >::send...;

  template<class M>
  broadcaster<M, derived_ts<basic_broadcasters<TS, Ms...>, M>>& station() { return *this; }
  template<class M>
  broadcaster<M, derived_ts<basic_broadcasters<TS, Ms...>, M>> const& station() const { return *this; }
};
template<class...Ms>
using broadcasters = basic_broadcasters<rw_thread_safe, Ms...>;

活生生的例子

broadcasters<Messages...>现在是一个读写锁定的广播类,它使用 1 个通用共享锁来同步每个广播队列。

basic_broadcasters<not_thread_safe, Messages...>而是创建一个没有锁定的(即不是线程安全的)。

于 2015-08-18T20:10:59.543 回答
1

我认为你应该坚持更简单的方法。如果您的所有观察者都处理所有消息,那么您必须有一种观察者类型。如果消息不相关,则每个观察者必须只观察它处理的消息。

使用 Boost::Signal2 的解决方案是:

#include <string>
#include <cstdio>
#include <iostream>
#include <functional>
#include <boost/signals2/signal.hpp>

class Subject
{
public:
    void emit_message_a(int v) {
        sig_a(v);
    }

    void emit_message_b(const std::string v) {
        sig_b(v);
    }

    template<typename F>
    void register_listener_a(const F &listener)
    {
        sig_a.connect(listener);
    }

    template<typename F>
    void register_listener_b(const F &listener)
    {
        sig_b.connect(listener);
    }

private:
    boost::signals2::signal<void (int)> sig_a;
    boost::signals2::signal<void (std::string)> sig_b;
};

class Observer
{
public:
    Observer():
        name("John")
    {}

    void observe(int v) {
        std::cout << name << " has observed phoenomenon int: " << v << std::endl;
    }

    void observe(std::string v) {
        std::cout << name << " has observed phoenomenon string: " << v << std::endl;
    }

private:
    std::string name;
};

int main()
{
    Subject s;
    Observer o;

    s.register_listener_a([&](int v){o.observe(v);});
    s.register_listener_b([&](std::string v){o.observe(v);});


    s.register_listener_a([](int val) {
        std::cout << "Received message a : " << val << std::endl;
    });
    s.register_listener_a([](int message_a) {
        printf("I have received message a, too! It is %d.\n", message_a);
    });

    s.register_listener_b([](std::string msg) {
        std::cout << "A B type message was received! Help!\n";
    });

    s.emit_message_a(42);

    s.emit_message_b("a string");

    s.emit_message_a(-1);

    s.emit_message_b("another string");
}

运行它,我得到:

John has observed phoenomenon int: 42
Received message a : 42
I have received message a, too! It is 42.
John has observed phoenomenon string: a string
A B type message was received! Help!
John has observed phoenomenon int: -1
Received message a : -1
I have received message a, too! It is -1.
John has observed phoenomenon string: another string
A B type message was received! Help!

如果您要使用它,请务必阅读手册

于 2015-08-18T20:09:51.877 回答