0

我需要实现从 PostgreSQL 数据库到 Oracle 数据库的更改数据捕获 (CDC)。

由于 PostgreSQL 的 CDC 没有日志化知识模块,我正在尝试调整 JKM Oracle Simple,如https://forums.oracle.com/forums/thread.jspa?threadID=620355中所述。

但是,我在使用 Jython“创建触发器”命令时遇到了问题。

在 ODI 中,我已将“创建触发器”命令替换为以下内容:

drop trigger if exists public_t_payment on public.payment;

drop sequence if exists idSequence;

CREATE SEQUENCE idSequence;

CREATE OR REPLACE FUNCTION public_t_payment_trigger() RETURNS TRIGGER AS $$
declare
    V_FLAG  VARCHAR(1);
    V_id    integer NOT NULL DEFAULT nextval('idSequence');
begin
    if inserting then
        V_id := NEW.id;
        V_FLAG := 'I';
    end if;

    if updating then
        V_id := NEW.id;
        V_FLAG := 'I';
    end if;

    if deleting then
        V_id := OLD.id;
        V_FLAG := 'D';
    end if;

    insert into public.j$payment
    (
        JRN_SUBSCRIBER,
        JRN_CONSUMED,
        JRN_FLAG,
        JRN_DATE,
        id
    )
    select  JRN_SUBSCRIBER,
        '0',
        V_FLAG,
        sysdate,
        V_id
    from    public."SNP_SUBSCRIBERS"
    where   JRN_TNAME = 'public.payment';
    /* The following line can be uncommented for symetric replication */
    /* and  upper(USER) <> upper(''postgres'') */
end; $$ LANGUAGE plpgsql;

create trigger public_t_payment
after insert or update or delete on public.payment
for each row
execute procedure public_t_payment_trigger();

上面的代码在 PostgreSQL 上复制和执行时效果很好,但是当我在源表上执行“启动日志”时,ODI 给了我以下错误:

ODI-1217: Session payment (712013) fails with return code 7000.
ODI-1226: Step payment fails after 1 attempt(s).
ODI-1231: An error occurred while performing a Journal operation on datastore payment.
Caused By: org.apache.bsf.BSFException: exception from Jython:
SyntaxError: ("no viable alternative at character '$'", ('<string>', 6, 19, 'returns trigger as $test\n'))

问题似乎与触发器的返回“as”名称($$)有关,但我不知道如何在 Jython 中解决这个问题。

4

1 回答 1

1

我设法解决了这个问题。

“创建触发器”命令的技术设置为 Jython,但我的代码纯粹是 PostgreSQL。

将技术下拉列表从“Jython”更改为“PostgreSQL”允许执行创建触发器命令而不会出现任何错误。

但是,由于我希望使命令尽可能与原始命令相似,因此我更新了上述代码以包含必要的 Jython 语法。

请注意,尽管上述问题中发布的 SQL 在 PostgreSQL 中执行,但它并不完全正确,因为在触发触发器时它仍然会出错。

我已将在 PostgreSQL 源数据库上执行的 Create Trigger 命令的完整 Jython 代码粘贴在下面,也许有人会发现它很有用,因为 Oracle JKM 没有 PostgreSQL。

triggerCmd = """
drop trigger if exists "<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>" on <%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>;

drop sequence if exists "seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id";

create sequence "seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id";

create or replace function "fn_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"()
returns trigger as $$
declare
    V_FLAG  VARCHAR(1);
    V_id integer NOT NULL DEFAULT nextval('"seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id"');
begin
    if (TG_OP = 'INSERT') then
        <%=odiRef.getColList("", "V_[COL_NAME] := new.[COL_NAME];", "\n\t\t", "", "PK")%>
        V_FLAG := 'I';
    end if;

    if (TG_OP = 'UPDATE') then
        <%=odiRef.getColList("", "V_[COL_NAME] := new.[COL_NAME];", "\n\t\t", "", "PK")%>
        V_FLAG := 'I';
    end if;

    if (TG_OP = 'DELETE') then
        <%=odiRef.getColList("", "V_[COL_NAME] := old.[COL_NAME];", "\n\t\t", "", "PK")%>
        V_FLAG := 'D';
    end if;

    insert into  <%=odiRef.getJrnInfo("JRN_FULL_NAME")%>
    (
        JRN_SUBSCRIBER,
        JRN_CONSUMED,
        JRN_FLAG,
        JRN_DATE,
        <%=odiRef.getColList("", "[COL_NAME]", ",\n\t\t", "", "PK")%>
    )
    select  JRN_SUBSCRIBER,
        '0',
        V_FLAG,
        now(),
        <%=odiRef.getColList("", "V_[COL_NAME]", ",\n\t\t", "", "PK")%>
    from    <%=odiRef.getJrnInfo("SNP_JRN_SUBSCRIBER")%>
    where   JRN_TNAME = '<%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>';
    /* The following line can be uncommented for symetric replication */
    /* and  upper(USER) <> upper('<%=odiRef.getInfo("DEST_USER_NAME")%>') */
return new;
end $$ language plpgsql;

create trigger "<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"
after insert or update or delete on <%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>
for each row    
execute procedure "fn_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"();
"""

# Create the statement
myStmt = myCon.createStatement()

# Execute the trigger creation
myStmt.execute(triggerCmd)

myStmt.close()
myStmt = None

# Commit, just in case
# myCon.commit()
于 2013-01-08T09:38:48.627 回答