12

我正在尝试组合一个 Cro 服务,该服务具有 react/whenever 阻止“在后台”消费数据因此与许多使用 Cro 的 websocket 使用示例不同,这与可以通过浏览器访问的路由无关。

我的用例是使用通过 MQTT 主题接收到的消息并对其进行一些处理。在开发的后期阶段,我可能会从这些数据中创建一个供应,但现在,当接收到数据时,它将存储在一个变量中,并根据某些条件,通过 http post 发送到另一个服务。

我的想法是像这样provider()Cro::HTTP::Server设置中包含一个:

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use DataProvider; # Here

my Cro::Service $http = Cro::HTTP::Server.new(
        http => <1.1>,
        host => ...,
        port => ...,
        application => [routes(), provider()], # Made this into an array of subs?
        after => [
            Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
        ]
    );

在 DataProvider.pm6 中:

use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {
        whenever $mqtt.subscribe('some/mqtt/topic') {
            say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
        }
    }
}

这会引发一堆错误:

A react block:
  in sub provider at DataProvider.pm6 (DataProvider) line 5
  in block <unit> at service.p6 line 26

Died because of the exception:
    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'.  Did
    you forget a '.new'?
      in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
      in sub provider at DataProvider.pm6 (DataProvider) line 6
      in block <unit> at service.p6 line 26

老实说,我完全猜想这就是我在 Cro 服务的后台订阅数据的方法,但我找不到任何关于什么可能被认为是推荐方法的信息。

最初我在主service.pm6文件中有我的 react/whenever 块,但这似乎不正确。并且需要被包裹在一个start{}块中,因为正如我刚刚了解到的那样,react 正在阻塞:) 并且 cro 无法真正启动。

但是遵循如何实现路由的模式似乎是合乎逻辑的,但我错过了一些东西。该错误涉及设置新方法,但我不相信这是根本原因。Routes.pm6没有构造函数。

谁能指出我正确的方向?

4

3 回答 3

8

感谢所有提供信息的人,这是一次非常有价值的学习练习。

router()application参数中传递额外的子例程的方法Cro::HTTP::Server.new给了进一步的麻烦。(不允许使用数组,并且破坏了路由)

相反,我将后台工作移到了它自己的一个类中,并给它一个startstop类似于Cro::HTTP::Server.

我的新方法:

服务.pm6

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use KlineDataSubscriber; # Moved mqtt functionality here 
use Database;

my $dsn         = "host=localhost port=5432 dbname=act user=.. password=..";
my $dbh         = Database.new :$dsn;

my $mqtt-host   = 'localhost';
my $subscriber  = KlineDataSubscriber.new :$mqtt-host;

$subscriber.start; # Inspired by $http.start below

my Cro::Service $http = Cro::HTTP::Server.new(
    http => <1.1>,
    host => ...,
    port => ...,
    application => routes($dbh), # Basically back the way it was originally 
    after => [
        Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
    ]
);

$http.start;
say "Listening at...";
react {
    whenever signal(SIGINT) {
        say "Shutting down...";
        $subscriber.stop;
        $http.stop;
        done;
    }
}

在 KlineDataSubscriber.pm6

use MQTT::Client;

class KlineDataSubscriber {
    has Str $.mqtt-host is required;
    has MQTT::Client $.mqtt = Nil;

    submethod TWEAK() {
        $!mqtt = MQTT::Client.new: server => $!mqtt-host;
        await $!mqtt.connect;
    }

    method start(Str $topic = 'act/feed/exchange/binance/kline-closed/+/json') {
        start {
            react {
                whenever $!mqtt.subscribe($topic) {
                    say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
                }
            }
        }
    }

    method stop() {
        # TODO Figure how to unsubscribe and cleanup nicely
    }
}

这对我来说感觉更像“Cro 惯用语”,但我很乐意得到纠正。更重要的是,它按预期工作,我觉得有点面向未来。我应该能够创建一个供应以使路由器可以使用实时数据,并将数据推送到任何连接的 Web 客户端。

我还打算有一个/status带有各种检查的 http GET 端点,以确保一切正常

于 2021-02-18T07:07:23.670 回答
7

根本原因

该错误涉及设置new方法,但我不相信这是根本原因。

这不是建立一种新方法。这是关于应该定义而不是未定义的值。这通常意味着无法尝试初始化它,这通常意味着无法调用 .new.

谁能指出我正确的方向?

希望这个问题有所帮助。

查找有关推荐方法的信息

我完全猜想这就是我在 Cro 服务的后台订阅数据的方式,但我找不到任何关于什么可能被认为是推荐的方法的信息。

列出您从Cro 入门中遵循的快速入门步骤可能会对您有所帮助,包括基础知识以及最后的“了解”步骤。

错误信息

A react block:
  in sub provider ...

Died because of the exception:
    ... 
      in method subscribe ...

错误消息以内置react构造报告它捕获异常开始(并通过抛出自己的异常作为响应来处理它)。提供了与代码中出现的位置相对应的“回溯”react,从初始的“A react block:”缩进。

错误消息继续react构造总结其自身的异常 ( ) 并通过在后续行中报告原始异常(进一步缩进Died because ...)来解释自身。这包括另一个回溯,这次对应于原始异常,这很可能发生在具有不同调用堆栈的不同线程上。

(所有 Raku 的结构化多线程结构[1]都使用这种两部分错误报告方法来捕获它们并通过抛出另一个异常来处理异常。)


第一个回溯指示该react行:

in sub provider at DataProvider.pm6 (DataProvider) line 5
use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {

第二个回溯是关于原始异常:

    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. ...
      in method subscribe at ... (MQTT::Client) line 133

这报告write在第 133 行调用的方法MQTT::Client要求其调用者是“IO::Socket:: Async ”类型的实例。它得到的值那个类型的,但不是一个实例,而是一个“类型对象”。(非本机类型的所有值要么是类型对象,要么是其类型的实例。)。

错误消息的结尾是:

  Did you forget a '.new'?

这是一个简洁的提示,基于这样一个现实:在需要实例时遇到类型对象的原因有 100 次中有 99 次是代码未能初始化变量。(类型对象的用途之一是在 Perl 等语言中充当“未定义”的角色。)

那么,你能明白为什么应该是 'IO::Socket::Async' 的初始化实例的东西反而是未初始化的吗?

脚注

[1] Raku 的并行、并发和异步构造遵循结构化编程范式。请参阅Raku 中的并行性、并发性和异步性,了解 Jonathan Worthington 对这种整体方法的视频演示。像这样的结构化构造react可以清晰地观察、包含和管理在其执行范围内的任何地方发生的事件,包括错误异常等错误,即使它们发生在其他线程上也是如此。

于 2021-02-17T17:42:22.060 回答
3

你现在看起来很好,但是当我第一次看到这个时,我做了这个https://github.com/jonathanstowe/Cro-MQTT,它将 MQTT 客户端变成了一流的 Cro 服务。

我还没有发布它,但它可能是有启发性的。

于 2021-03-09T20:02:20.577 回答