我已经设置了 pipelinedb,效果很好!我想知道在视图中的值更新后是否可以从连续视图中流式传输数据?也就是说,让一些外部流程对视图的更改采取行动。
我希望将从视图生成的指标流式传输到仪表板中,并且我不想使用轮询数据库来实现这一点。
我已经设置了 pipelinedb,效果很好!我想知道在视图中的值更新后是否可以从连续视图中流式传输数据?也就是说,让一些外部流程对视图的更改采取行动。
我希望将从视图生成的指标流式传输到仪表板中,并且我不想使用轮询数据库来实现这一点。
从 0.9.5 开始,连续触发器已被删除,以支持使用输出流和连续转换。(首先由 DidacticTactic 提出)。连续视图的输出本质上是一个流,这意味着您可以基于它创建连续视图或转换。
简单示例:
CREATE STREAM s (
x int
);
CREATE CONTINUOUS VIEW hourly_cv AS
SELECT
hour(arrival_timestamp) AS ts,
SUM(x) AS sum
FROM s GROUP BY ts;
output_of
。在转换中,您可以访问元组old
,new
它们分别代表旧值和新值。(0.9.7 有第三个调用delta
)所以你可以创建一个使用 'hourly_cv' 输出的转换,如下所示:CREATE CONTINUOUS TRANSFORM hourly_ct AS
SELECT
(new).sum
FROM output_of('hourly_cv')
THEN EXECUTE PROCEDURE update();
update
我们仍然需要定义的。它需要是一个返回触发器的函数。CREATE OR REPLACE FUNCTION update()
RETURNS trigger AS
$$
BEGIN
// Do anything you want here.
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
我发现0.9.5 发布说明博客文章有助于理解输出流以及为什么不再有连续触发器。
查看我们的技术文档中有关输出流和连续转换的部分,以获取有关如何执行此操作的帮助,如果您需要文档中提供的帮助之外的帮助,请随时在我们的Gitter 频道中联系我们。
我觉得自己像个白痴,试图使用 Didactic 提供的工具来弄清楚这个问题的答案可能是什么样的。也许我是盲人,但我仍然没有找到方法。我找到了包含连续触发器的 9.3 版本的数据库,但它已被删除,我不希望切换到旧版本的数据库。
这有点令人难过,但我想它已从项目的开源版本中移出,以适应同一家公司提供的实时分析仪表板项目。
无论哪种方式。我通过使用存储过程解决了这个问题。与内置函数提供的功能相比,它的效率可能略低,但我每分钟要访问数据库几千次,而我的 VM CPU 和 RAM 只是对我打哈欠。
CREATE OR REPLACE FUNCTION all_insert(text,text)
RETURNS void AS
$BODY$
DECLARE
result text;
BEGIN
INSERT INTO all_in (streamid, generalinput) values($1, $2);
SELECT array_to_json(array_agg(json_build_object('streamId', streamid, 'total', count)))::text into result from totals;
PERFORM pg_notify('totals', result);
END
$BODY$
LANGUAGE plpgsql;
所以我的插入和通知是通过查询这个单一的存储过程来完成的。然后我的应用程序只需要监听 PSQL db 通知事件并适当地处理它们。在上面的示例中,应用程序将接收具有特定流 id 和与之关联的总数的 JSON 对象。