2

我已经设置了 pipelinedb,效果很好!我想知道在视图中的值更新后是否可以从连续视图中流式传输数据?也就是说,让一些外部流程对视图的更改采取行动。

我希望将从视图生成的指标流式传输到仪表板中,并且我不想使用轮询数据库来实现这一点。

4

3 回答 3

3

从 0.9.5 开始,连续触发器已被删除,以支持使用输出流和连续转换。(首先由 DidacticTactic 提出)。连续视图的输出本质上是一个流,这意味着您可以基于它创建连续视图或转换。

简单示例:

  1. 首先创建一个流和连续视图。
CREATE STREAM s (
    x int
);

CREATE CONTINUOUS VIEW hourly_cv AS
    SELECT
        hour(arrival_timestamp) AS ts,
        SUM(x) AS sum
    FROM s GROUP BY ts;
  1. 现在,每个连续视图都有一个输出流。您可以使用基于视图的输出创建转换output_of。在转换中,您可以访问元组oldnew它们分别代表旧值和新值。(0.9.7 有第三个调用delta)所以你可以创建一个使用 'hourly_cv' 输出的转换,如下所示:
CREATE CONTINUOUS TRANSFORM hourly_ct AS
    SELECT
        (new).sum
    FROM output_of('hourly_cv')
    THEN EXECUTE PROCEDURE update();
  1. 在这个例子中,我正在调用update我们仍然需要定义的。它需要是一个返回触发器的函数。
CREATE OR REPLACE FUNCTION update()
    RETURNS trigger AS
    $$
    BEGIN
        // Do anything you want here.
        RETURN NEW;
    END;
    $$
    LANGUAGE plpgsql;

我发现0.9.5 发布说明博客文章有助于理解输出流以及为什么不再有连续触发器。

于 2017-07-07T18:21:48.843 回答
2

查看我们的技术文档中有关输出流连续转换的部分,以获取有关如何执行此操作的帮助,如果您需要文档中提供的帮助之外的帮助,请随时在我们的Gitter 频道中联系我们。

于 2017-05-26T18:05:53.177 回答
1

我觉得自己像个白痴,试图使用 Didactic 提供的工具来弄清楚这个问题的答案可能是什么样的。也许我是盲人,但我仍然没有找到方法。我找到了包含连续触发器的 9.3 版本的数据库,但它已被删除,我不希望切换到旧版本的数据库。

这有点令人难过,但我想它已从项目的开源版本中移出,以适应同一家公司提供的实时分析仪表板项目。

无论哪种方式。我通过使用存储过程解决了这个问题。与内置函数提供的功能相比,它的效率可能略低,但我每分钟要访问数据库几千次,而我的 VM CPU 和 RAM 只是对我打哈欠。

CREATE OR REPLACE FUNCTION all_insert(text,text)
  RETURNS void AS
 $BODY$
DECLARE
    result text;
BEGIN
    INSERT INTO all_in (streamid, generalinput) values($1, $2);
    SELECT array_to_json(array_agg(json_build_object('streamId', streamid, 'total', count)))::text into result from totals;
    PERFORM pg_notify('totals', result);
END
$BODY$
LANGUAGE plpgsql;

所以我的插入和通知是通过查询这个单一的存储过程来完成的。然后我的应用程序只需要监听 PSQL db 通知事件并适当地处理它们。在上面的示例中,应用程序将接收具有特定流 id 和与之关联的总数的 JSON 对象。

于 2017-06-29T12:42:52.117 回答