我正在尝试组合一个 Cro 服务,该服务具有 react/whenever 阻止“在后台”消费数据因此与许多使用 Cro 的 websocket 使用示例不同,这与可以通过浏览器访问的路由无关。
我的用例是使用通过 MQTT 主题接收到的消息并对其进行一些处理。在开发的后期阶段,我可能会从这些数据中创建一个供应,但现在,当接收到数据时,它将存储在一个变量中,并根据某些条件,通过 http post 发送到另一个服务。
我的想法是像这样provider()
在Cro::HTTP::Server
设置中包含一个:
use Cro::HTTP::Log::File;
use Cro::HTTP::Server;
use Routes;
use DataProvider; # Here
my Cro::Service $http = Cro::HTTP::Server.new(
http => <1.1>,
host => ...,
port => ...,
application => [routes(), provider()], # Made this into an array of subs?
after => [
Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
]
);
在 DataProvider.pm6 中:
use MQTT::Client;
sub provider() is export {
my $mqtt = MQTT::Client.new: server => 'localhost';
react {
whenever $mqtt.subscribe('some/mqtt/topic') {
say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
}
}
}
这会引发一堆错误:
A react block:
in sub provider at DataProvider.pm6 (DataProvider) line 5
in block <unit> at service.p6 line 26
Died because of the exception:
Invocant of method 'write' must be an object instance of type
'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. Did
you forget a '.new'?
in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
in sub provider at DataProvider.pm6 (DataProvider) line 6
in block <unit> at service.p6 line 26
老实说,我完全猜想这就是我在 Cro 服务的后台订阅数据的方法,但我找不到任何关于什么可能被认为是推荐方法的信息。
最初我在主service.pm6
文件中有我的 react/whenever 块,但这似乎不正确。并且需要被包裹在一个start{}
块中,因为正如我刚刚了解到的那样,react 正在阻塞:) 并且 cro 无法真正启动。
但是遵循如何实现路由的模式似乎是合乎逻辑的,但我错过了一些东西。该错误涉及设置新方法,但我不相信这是根本原因。Routes.pm6
没有构造函数。
谁能指出我正确的方向?