0

我们使用 Pipeline DB 将数据接收到流式表中,并在两个流式视图中,在一个视图中过滤掉类型转换验证错误失败的记录,而在另一个视图中过滤掉类型转换错误失败的记录。理想情况下,我们试图将好记录与坏记录分开,并将它们具体化为两个决赛桌。

例如,系统被配置为以 YYYY/MM/DD HH24:MI:SS 格式接收来自第三方的数据,但由于某些原因,值显示在日期和月份被翻转的位置。在 PipelineDB 中,由于使用 PostGres SQL “to_timestamp(mycolumn,'YYYY/MM/DD HH24:MI:SS')” 如果“mycolumn”中的文本类似于“2019/15/05 13”,则会引发硬错误: 10:24'。并且在该事务中输入到流中的任何记录都会回滚。(因此,如果使用 PG Cop​​y,一条记录未能通过物化流视图会导致将零条记录全部插入。这在数据自动化中不是理想的场景,在这种情况下,第 3 方自动化系统可能不太关心我们要处理的问题它的数据。)

从我所看到的: - PostGres 没有“本地 SQL” 方法来执行“尝试解析” - PipelineDB 不支持用户定义的函数(如果我们编写了一个具有两个输出的函数,一个用于解析值,另一个返回布尔“is_valid”列)。(我的假设是该函数驻留在服务器上,而 pipelinedb 作为外部服务器执行,这完全是另一回事。)

理想情况下,如果函数有效,则函数返回类型转换值和布尔标志,并且可以在流视图的 WHERE 子句中使用它来从坏记录中分叉好记录。但我似乎无法解决这个问题?有什么想法吗?

4

1 回答 1

0

经过很多时间,我找到了解决这个问题的方法。我不喜欢它,但它会起作用。

在意识到整个问题取决于以下几点后,我才恍然大悟:

http://docs.pipelinedb.com/continuous-transforms.html “您可以将连续变换视为传入流数据之上的触发器,其中触发器函数针对连续变换输出的每个新行执行。内部函数作为 AFTER INSERT FOR EACH ROW 触发器执行,因此没有 OLD 行,而 NEW 行包含连续转换输出的行。”

我花了几个小时试图弄清楚:“为什么我写的自定义函数不起作用,我为传入数据流“尝试解析”数据类型?物化视图或输出表中不会显示任何内容?并且没有硬错误被 PipelineDB 抛出?然后几个小时后,我意识到问题与 PipelineDB 无法处理用户定义的函数这一事实有关,而是在持续转换中,表示为 SQL 的转换正在发生“之后ROW IS INSERTED”。因此,从根本上说,改变物化流中数据字段的类型转换在开始之前就失败了。

解决方案(这不是很优雅)是: 1 - 将类型转换逻辑或任何可能导致错误的 SQL 逻辑移动到触发器函数中 2 - 在触发器函数中创建一个“当其他人时例外”部分 3 -确保返回新的;转型成功和失败的两种情况都会发生。4 - 使连续转换仅作为不应用逻辑的传递,它只是调用触发器。(在这种情况下,它确实在某种程度上否定了使用 PipelineDB 来解决这个初始数据分段问题的全部意义。但是,我离题了。)

有了这个,我创建了一个表来捕获错误,并通过确保上面列出的所有 3 个步骤都得到实施,然后我们确保事务将成功。

这很重要,因为如果没有这样做并且“您在异常中得到异常”,或者您没有优雅地处理异常,则不会加载任何记录。

这支持了策略:我们只是试图创建一个数据处理“河中的叉子”,以一种方式将成功转换为一个表(或流表)的记录路由,将转换失败的记录路由为错误表。

下面我展示了一个 POC,我们将记录作为流处理并将它们具体化为物理表。(它也可能是另一个流)。实现这一目标的关键是:

错误表使用文本列 触发器函数捕获尝试转换中的错误,并将它们与系统返回的错误基本描述一起写入错误表。

我提到我不“喜欢”该解决方案,但这是我在几个小时内找到的最好的解决方案,以解决 PipelineDB 将事情作为触发器后插入的限制,因此插入失败不可能是捕获,并且 pipelinedb 没有内置的内在能力来处理: - 在失败时继续处理事务中的流 - 在行级别优雅地失败并提供更简单的机制来将失败的转换路由到错误表

DROP SCHEMA IF EXISTS pdb CASCADE;
CREATE SCHEMA IF NOT EXISTS pdb;


DROP TABLE IF EXISTS pdb.lis_final;
CREATE TABLE pdb.lis_final(
    edm___row_id bigint,
    edm___created_dtz timestamp with time zone DEFAULT current_timestamp,
    edm___updatedat_dtz timestamp with time zone DEFAULT current_timestamp,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt timestamp,
    samplereceived_dt timestamp,
    testperformed_dt timestamp,
    testresultsreleased_dt timestamp,
    extractedfromsourceat_dt timestamp,
    birthdate_d date
);

DROP TABLE IF EXISTS pdb.lis_errors;
CREATE TABLE pdb.lis_errors(
    edm___row_id bigint,
    edm___errorat_dtz timestamp with time zone default current_timestamp,
    edm___errormsg text,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
);


DROP FOREIGN TABLE IF EXISTS pdb.lis_streaming_table CASCADE;
CREATE FOREIGN TABLE pdb.lis_streaming_table (
    edm___row_id serial,
    patient_id text,
    encounter_id text,
    order_id text,
    sample_id text,
    container_id text,
    result_id text,
    orderrequestcode text,
    orderrequestname text,
    testresultcode text,
    testresultname text,
    testresultcost text,
    testordered_dt text,
    samplereceived_dt text,
    testperformed_dt text,
    testresultsreleased_dt text,
    extractedfromsourceat_dt text,
    birthdate_d text
)
SERVER pipelinedb;


CREATE OR REPLACE FUNCTION insert_into_t()
  RETURNS trigger AS
  $$
  BEGIN

    INSERT INTO pdb.lis_final
    SELECT
        NEW.edm___row_id,
        current_timestamp as edm___created_dtz,
        current_timestamp as edm___updatedat_dtz,
        NEW.patient_id,
        NEW.encounter_id,
        NEW.order_id,
        NEW.sample_id,
        NEW.container_id,
        NEW.result_id,
        NEW.orderrequestcode,
        NEW.orderrequestname,
        NEW.testresultcode,
        NEW.testresultname,
        NEW.testresultcost,
        to_timestamp(NEW.testordered_dt,'YYYY/MM/DD HH24:MI:SS') as testordered_dt,
        to_timestamp(NEW.samplereceived_dt,'YYYY/MM/DD HH24:MI:SS') as samplereceived_dt,
        to_timestamp(NEW.testperformed_dt,'YYYY/MM/DD HH24:MI:SS') as testperformed_dt,
        to_timestamp(NEW.testresultsreleased_dt,'YYYY/MM/DD HH24:MI:SS') as testresultsreleased_dt,
        to_timestamp(NEW.extractedfromsourceat_dt,'YYYY/MM/DD HH24:MI:SS') as extractedfromsourceat_dt,
        to_date(NEW.birthdate_d,'YYYY/MM/DD') as birthdate_d;

    -- Return new as nothing happens
    RETURN NEW;

    EXCEPTION WHEN others THEN

        INSERT INTO pdb.lis_errors
        SELECT
            NEW.edm___row_id,
            current_timestamp as edm___errorat_dtz,
            SQLERRM as edm___errormsg,
            NEW.patient_id,
            NEW.encounter_id,
            NEW.order_id,
            NEW.sample_id,
            NEW.container_id,
            NEW.result_id,
            NEW.orderrequestcode,
            NEW.orderrequestname,
            NEW.testresultcode,
            NEW.testresultname,
            NEW.testresultcost,
            NEW.testordered_dt,
            NEW.samplereceived_dt,
            NEW.testperformed_dt,
            NEW.testresultsreleased_dt,
            NEW.extractedfromsourceat_dt,
            NEW.birthdate_d;

        -- Return new back to the streaming view as we don't want that process to error.  We already routed the record above to the errors table as text.
        RETURN NEW;

  END;
  $$
  LANGUAGE plpgsql;


DROP VIEW IF EXISTS pdb.lis_tryparse CASCADE;
CREATE VIEW pdb.lis_tryparse WITH (action=transform, outputfunc=insert_into_t) AS
SELECT
    edm___row_id,
    patient_id,
    encounter_id,
    order_id,
    sample_id,
    container_id,
    result_id,
    orderrequestcode,
    orderrequestname,
    testresultcode,
    testresultname,
    testresultcost,
    testordered_dt,
    samplereceived_dt,
    testperformed_dt,
    testresultsreleased_dt,
    extractedfromsourceat_dt,
    birthdate_d
FROM pdb.lis_streaming_table as st;
于 2019-05-16T13:59:01.973 回答