我已经让命令总线启动并运行了一段时间,并开发了很多我的应用程序。现在它们是我希望异步处理的一些命令。我已经添加了 BernardMessageProducer,这一切似乎都有效。我只想将窗帘命令发送到异步总线,其余的在本地处理。
这可能吗,我该怎么做。我已经写了一个命令路由器,但是在命令总线处理完消息之后。
谢谢
从自述文件
如果你想设置一个异步处理所有消息的总线,你可以通过将一个 Prooph\ServiceBus\Plugin\MessageProducerPlugin 与你选择的消息生产者初始化连接到消息总线来实现。
让我们看一个使用 psb-zeromq-producer 的简单示例
//app bootstrap
$container = new Container;
$container['config'] = [
'prooph' => [
'zeromq_producer' => [
'dsn' => 'tcp://127.0.0.1:5555', // ZMQ Server Address.
'persistent_id' => 'example', // ZMQ Persistent ID to keep connections alive between requests.
'rpc' => false, // Use as Query Bus.
]
]
];
$factory = \Prooph\ServiceBus\Message\ZeroMQ\Container\ZeroMQMessageProducerFactory;
$zmqProducer = $factory($container);
$commandBus = new \Prooph\ServiceBus\CommandBus();
$messageProducerForwarder = new \Prooph\ServiceBus\Plugin\MessageProducerPlugin($zmqProducer);
$commandBus->utilize($messageProducerForwarder);
$echoText = new ExampleCommand('It works');
$commandBus->dispatch($echoText);
您还可以使用消息路由器插件将单个消息路由到消息生产者。
注意:如果您想异步处理特定命名空间的所有消息,Prooph\ServiceBus\Plugin\Router\RegexRouter 是一个不错的选择。
我不确定如何使用 prooph 执行此操作(尤其是因为您没有提供任何代码示例),但一般来说:在Command Bus文档中的 Mathias Noback 的 MessageBus-repository 中可以找到一个好的方法。
您可以创建一个中间件来检查,例如标记接口(如上面链接的示例):
public function handle($message, callable $next)
{
if ($message instanceof IsHandledAsynchronously) {
// handle the message asynchronously using a message queue
$this->messageQueue->add($message);
} else {
// handle the message synchronously, i.e. right-away
$next($message);
}
}
然后只需通过让它实现正确的接口并在命令总线中的正确位置附加中间件来标记您的命令。
如果您的问题表明您有多个命令总线。然后你可能想要某种与命令匹配的CommandResolver,例如通过它的类名到适当的命令总线。再次查看 Mathias Noback 的文档,尤其是在同一文档中定义命令处理程序映射和DelegatesToMessageHandlerInterface部分