1

我需要一个流水线函数,它可以从 sys_refcursor 中返回一个表,其中 sys_refcursor 参数在编译时是未知的。

例如:

select * 
from  table(pipeline_func(cursor(select 1 col_1, 2 col_2 from dua)))

或者

select * 
from  table(pipeline_func(cursor(select 1 col_1, 2 col_2, 3 col_3 from dua))).

我已经阅读了 函数的返回结果集

并尝试修改代码,使其可以接受 sys_refcursor 作为参数而不是字符串 SQL 语句。但是错误发生在该行:

pkg_dynamic_pipeline.r_sql.execute := dbms_sql.execute(pkg_dynamic_pipeline.r_sql.cursor); 

错误消息:无效光标。

您能帮我解决错误或给我提示以解决我的问题吗?

以下是我的代码:

CREATE OR REPLACE type dynamic_pipeline as object
(
     atype anytype,
     static function ODCITableDescribe
     (
        rtype out anytype, 
        sql_cur  in out SYS_REFCURSOR
     )
     return number,

     static function ODCITablePrepare
     (
        sctx      out dynamic_pipeline,
        tf_info   in  sys.ODCITabfuncinfo,
        sql_cur  in out SYS_REFCURSOR
      )
      return number,

     static function ODCITableStart
     (
        sctx  in out dynamic_pipeline,
        sql_cur  in out SYS_REFCURSOR
     )
     return number,

     member function ODCITablefetch
     ( 
        self  in out dynamic_pipeline,
        nrows in     number,
        rws   out    anydataset
     )
     return number,

     member function ODCITableClose
     (
        self in dynamic_pipeline
     )
     return number
 );


CREATE OR REPLACE package ICMDB_OAA.pkg_dynamic_pipeline as
 /*
  * Global Types
  */
  -- Describe array.
  type dynamic_sql_rec is record
  (
        cursor    integer,
        column_cnt  pls_integer,
        description dbms_sql.desc_tab2,
        execute     integer
   );


   type anytype_metadata_rec is record
   (
        precision pls_integer,
        scale     pls_integer,
        length    pls_integer,
        csid      pls_integer,
        csfrm     pls_integer,
        schema    varchar2(30),
        type      anytype,
        name      varchar2(30),
        version   varchar2(30),
        attr_cnt  pls_integer,
        attr_type anytype,
        attr_name varchar2(128),
        typecode  pls_integer
    );

    /*
    * Global Variables
    */
    -- SQL descriptor.
     r_sql dynamic_sql_rec;

     /*
      * function will run the given SQL
      */
     /*
        function querydb(p_stmt in varchar2)
        return anydataset pipelined using dyn_pipeline;
     */

     function querydb(p_cursor in sys_refcursor)
     return anydataset pipelined using dynamic_pipeline;

end pkg_dynamic_pipeline;
/

CREATE OR REPLACE type body dynamic_pipeline
as
    /*
     * DESC step. this will be called at hard parse and will create
     * a physical type in the DB Schema based on the select columns.
     */
    static function ODCITableDescribe
    (
        rtype out anytype, 
        sql_cur  in out  SYS_REFCURSOR
    )
    return number
    is

      /* Variables */
      -- Type to hold the dbms_sql info (description)
      r_sql   pkg_dynamic_pipeline.dynamic_sql_rec;
       -- Type to create (has all the columns) of the sql query.
       t_anyt  anytype;
       -- SQL query that will be made up from the 2 passed in queries.
       -- v_sql   varchar2(32767);
        -- cur_local sys_refcursor;
     begin
            DBMS_OUTPUT.PUT_LINE('ODCITableDescribe');    

      /*
       * Parse the SQL and describe its format and structure.
       */
      -- v_sql := replace(stmt, ';', null);

      -- open, parse and discover all info about this SQL.
      -- r_sql.cursor := dbms_sql.open_cursor;
      -- dbms_sql.parse( r_sql.cursor, v_sql, dbms_sql.native );
      r_sql.cursor := DBMS_SQL.TO_CURSOR_NUMBER(sql_cur);
      dbms_sql.describe_columns2( r_sql.cursor, r_sql.column_cnt, r_sql.description );
      --dbms_sql.close_cursor( r_sql.cursor );

      -- Start to create the physical type.
      anytype.BeginCreate( DBMS_TYPES.TYPECODE_OBJECT, t_anyt );

      -- Loop through each attribute and add to the type.
      for i in 1 .. r_sql.column_cnt
      loop

        t_anyt.AddAttr(r_sql.description(i).col_name,
                       case
                         when r_sql.description(i).col_type in (1,96,11,208)
                         then dbms_types.typecode_varchar2
                        when r_sql.description(i).col_type = 2
                         then dbms_types.typecode_number
                         when r_sql.description(i).col_type in (8,112)
                         then dbms_types.typecode_clob
                         when r_sql.description(i).col_type = 12
                         then dbms_types.typecode_date
                         when r_sql.description(i).col_type = 23
                         then dbms_types.typecode_raw
                         when r_sql.description(i).col_type = 180
                         then dbms_types.typecode_timestamp
                         when r_sql.description(i).col_type = 181
                         then dbms_types.typecode_timestamp_tz
                         when r_sql.description(i).col_type = 182
                         then dbms_types.typecode_interval_ym
                         when r_sql.description(i).col_type = 183
                         then dbms_types.typecode_interval_ds
                         when r_sql.description(i).col_type = 231
                         then dbms_types.typecode_timestamp_ltz
                       end,
                       r_sql.description(i).col_precision,
                       r_sql.description(i).col_scale,
                       r_sql.description(i).col_max_len,
                       r_sql.description(i).col_charsetid,
                       r_sql.description(i).col_charsetform );
      end loop;

      t_anyt.EndCreate;

      -- set the output type to our built type.
      ANYTYPE.BeginCreate(dbms_types.TYPECODE_TABLE, rtype);
      rtype.SetInfo(null, null, null, null, null, t_anyt,
                    dbms_types.TYPECODE_OBJECT, 0);
      rtype.EndCreate();

      sql_cur:= DBMS_SQL.TO_REFCURSOR(r_sql.cursor);

      DBMS_OUTPUT.PUT_LINE('ODCITableDescribe End');  

      return ODCIConst.Success;

    end ODCITableDescribe;


    /*
     * PREPARE step. Initialise our type.
     */
   static function ODCITablePrepare
   (
        sctx      out dynamic_pipeline,
        tf_info   in  sys.ODCITabfuncinfo,
        sql_cur  in  out SYS_REFCURSOR
   )
       return number
     is

       /* Variables */
       -- Meta data.
       r_meta   pkg_dynamic_pipeline.anytype_metadata_rec;

     begin

       DBMS_OUTPUT.PUT_LINE('ODCITablePrepare');

       r_meta.typecode := tf_info.rettype.getattreleminfo(
                           1, r_meta.precision, r_meta.scale, r_meta.length,
                           r_meta.csid, r_meta.csfrm, r_meta.type, r_meta.name
                         );

      sctx := dynamic_pipeline(r_meta.type);

      DBMS_OUTPUT.PUT_LINE('ODCITablePrepare End');

      return odciconst.success;

    end;


    /*
    * START step. this is where we execute the cursor prior to fetching from it.
     */
   static function ODCITableStart
   (
        sctx  in out dynamic_pipeline,
        sql_cur  in out SYS_REFCURSOR
   )
      return number
    is

      /* Variables */
      r_meta pkg_dynamic_pipeline.anytype_metadata_rec;
      --v_sql varchar2(32767);
      --cur_local sys_refcursor;
    begin

      DBMS_OUTPUT.PUT_LINE('ODCITableStart');

      --cur_local:=sql_cur;

      --v_sql := replace(stmt, ';', null);
      --pkg_dynamic_pipeline.r_sql.cursor := dbms_sql.open_cursor;
      pkg_dynamic_pipeline.r_sql.cursor := DBMS_SQL.TO_CURSOR_NUMBER(sql_cur);
      --dbms_sql.parse(pkg_dynamic_pipeline.r_sql.cursor, v_sql, dbms_sql.native);
      dbms_sql.describe_columns2(pkg_dynamic_pipeline.r_sql.cursor,
                                 pkg_dynamic_pipeline.r_sql.column_cnt,
                                 pkg_dynamic_pipeline.r_sql.description);

      -- define all the columns found to let Oracle know the datatypes.
      DBMS_OUTPUT.PUT_LINE('ODCITableStart. column_cnt['||pkg_dynamic_pipeline.r_sql.column_cnt||']');
      for i in 1..pkg_dynamic_pipeline.r_sql.column_cnt
      loop

        r_meta.typecode := sctx.atype.GetAttrElemInfo(
                             i, r_meta.precision, r_meta.scale, r_meta.length,
                             r_meta.csid, r_meta.csfrm, r_meta.type, r_meta.name
                           );

        case r_meta.typecode
          when dbms_types.typecode_varchar2
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, '', 32767);
          when dbms_types.typecode_number
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as number));
          when dbms_types.typecode_date
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as date));
          when dbms_types.typecode_raw
          then
            dbms_sql.define_column_raw(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as raw), r_meta.length);
          when dbms_types.typecode_timestamp
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as timestamp));
          when dbms_types.typecode_timestamp_tz
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as timestamp with time zone));
          when dbms_types.typecode_timestamp_ltz
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as timestamp with local time zone));
          when dbms_types.typecode_interval_ym
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as interval year to month));
          when dbms_types.typecode_interval_ds
          then
            dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as interval day to second));
          when dbms_types.typecode_clob
          then
            case pkg_dynamic_pipeline.r_sql.description(i).col_type
              when 8
              then
                dbms_sql.define_column_long(pkg_dynamic_pipeline.r_sql.cursor, i);
              else
                dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as clob));
            end case;
        end case;
      end loop;

      -- execute the SQL.
      pkg_dynamic_pipeline.r_sql.execute := dbms_sql.execute(pkg_dynamic_pipeline.r_sql.cursor);

      sql_cur:= DBMS_SQL.TO_REFCURSOR(pkg_dynamic_pipeline.r_sql.cursor);

      DBMS_OUTPUT.PUT_LINE('ODCITableStart End');

      return odciconst.success;

    end ODCITablestart;

    /*
     * FETCH step.
     */
    member function ODCITablefetch
    (
            self   in out dynamic_pipeline,
            nrows  in     number,
            rws    out    anydataset
     )
     return number
    is
      /* Variables */
      -- Buffers to hold values.
      v_vc_col       varchar2(32767);
      v_num_col      number;
      v_date_col     date;
      v_raw_col      raw(32767);
      v_raw_error    number;
      v_raw_len      integer;
      v_int_ds_col   interval day to second;
      v_int_ym_col   interval year to month;
      v_ts_col       timestamp;
      v_tstz_col     timestamp with time zone;
      v_tsltz_col    timestamp with local time zone;
      v_clob_col     clob;
      v_clob_offset  integer := 0;
      v_clob_len     integer;
      -- Metadata
      r_meta  pkg_dynamic_pipeline.anytype_metadata_rec;

    begin

        DBMS_OUTPUT.PUT_LINE('ODCITablefetch');

      if dbms_sql.fetch_rows( pkg_dynamic_pipeline.r_sql.cursor ) > 0
      then

        -- Describe to get number and types of columns.
        r_meta.typecode := self.atype.getinfo(
                             r_meta.precision, r_meta.scale, r_meta.length,
                             r_meta.csid, r_meta.csfrm, r_meta.schema,
                             r_meta.name, r_meta.version, r_meta.attr_cnt
                           );

        anydataset.begincreate(dbms_types.typecode_object, self.atype, rws);
        rws.addinstance();
        rws.piecewise();

        -- loop through each column extracting value.
        for i in 1..pkg_dynamic_pipeline.r_sql.column_cnt
        loop

          r_meta.typecode := self.atype.getattreleminfo(
                               i, r_meta.precision, r_meta.scale, r_meta.length,
                               r_meta.csid, r_meta.csfrm, r_meta.attr_type,
                               r_meta.attr_name
                             );

          case r_meta.typecode
            when dbms_types.typecode_varchar2
            then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_vc_col);
              rws.setvarchar2(v_vc_col);
            when dbms_types.typecode_number
            then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_num_col);
              rws.setnumber(v_num_col);
            when dbms_types.typecode_date
            then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_date_col);
              rws.setdate(v_date_col);
            when dbms_types.typecode_raw
            then
              dbms_sql.column_value_raw(pkg_dynamic_pipeline.r_sql.cursor, i, v_raw_col,
                 v_raw_error, v_raw_len);
              rws.setraw(v_raw_col);
            when dbms_types.typecode_interval_ds
            then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_int_ds_col);
              rws.setintervalds(v_int_ds_col);
            when dbms_types.typecode_interval_ym
            then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_int_ym_col);
              rws.setintervalym(v_int_ym_col);
            when dbms_types.typecode_timestamp
            then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_ts_col);
              rws.settimestamp(v_ts_col);
            when dbms_types.typecode_timestamp_tz
            then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_tstz_col);
              rws.settimestamptz(v_tstz_col);
           when dbms_types.typecode_timestamp_ltz
           then
              dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_tsltz_col);
              rws.settimestampltz(v_tsltz_col);
           when dbms_types.typecode_clob
           then
             case pkg_dynamic_pipeline.r_sql.description(i).col_type
               when 8
               then
                 loop
                   dbms_sql.column_value_long(pkg_dynamic_pipeline.r_sql.cursor, i, 32767, v_clob_offset,
                                              v_vc_col, v_clob_len);
                   v_clob_col := v_clob_col || v_vc_col;
                   v_clob_offset := v_clob_offset + 32767;
                   exit when v_clob_len < 32767;
                 end loop;
               else
                 dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_clob_col);
             end case;
             rws.setclob(v_clob_col);
          end case;
        end loop;

        rws.endcreate();

      end if;

      DBMS_OUTPUT.PUT_LINE('ODCITablefetch End');

      return ODCIConst.Success;

    end;

    /*
     * CLOSE step. close the cursor.
     */
    member function ODCITableClose(self in dynamic_pipeline)
      return number
    is
    begin
      DBMS_OUTPUT.PUT_LINE('ODCITableClose');
      dbms_sql.close_cursor( pkg_dynamic_pipeline.r_sql.cursor );
      pkg_dynamic_pipeline.r_sql := null;
      DBMS_OUTPUT.PUT_LINE('ODCITableClose End');
      return odciconst.success;
    end ODCITableClose;

  end;

/
4

1 回答 1

0

注释掉 DYNAMIC_PIPELINE 类型主体中的第 206 和 208 行,它将起作用:

--pkg_dynamic_pipeline.r_sql.execute := dbms_sql.execute(pkg_dynamic_pipeline.r_sql.cursor);

--sql_cur:= DBMS_SQL.TO_REFCURSOR(pkg_dynamic_pipeline.r_sql.cursor);

例如:

SQL> select * from table(pkg_dynamic_pipeline.querydb(cursor(select 1 col_1, 2 col_2 from dual)));

     COL_1      COL_2
---------- ----------
         1          2

在测试修复之前,运行下面的块来清除包变量并强制硬解析:

begin
    dbms_session.reset_package;
    execute immediate 'alter system flush shared_pool';
end;
/

您可能希望基于我的Method4 开源程序而不是其他 Stackoverflow 答案来构建您的代码。两个版本都基于 Adrian Billington 的原始版本,但我添加了许多有用的类型修复。例如,原始版本不适用于 NVARCHAR2、float、30 字节列名等。

此外,如果有充分的理由使用游标而不是字符串,并且您认为它可能对其他人有帮助,我愿意在我的项目中添加一个游标版本。

于 2016-03-20T21:00:20.293 回答