我需要一个流水线函数,它可以从 sys_refcursor 中返回一个表,其中 sys_refcursor 参数在编译时是未知的。
例如:
select *
from table(pipeline_func(cursor(select 1 col_1, 2 col_2 from dua)))
或者
select *
from table(pipeline_func(cursor(select 1 col_1, 2 col_2, 3 col_3 from dua))).
我已经阅读了 函数的返回结果集
并尝试修改代码,使其可以接受 sys_refcursor 作为参数而不是字符串 SQL 语句。但是错误发生在该行:
pkg_dynamic_pipeline.r_sql.execute := dbms_sql.execute(pkg_dynamic_pipeline.r_sql.cursor);
错误消息:无效光标。
您能帮我解决错误或给我提示以解决我的问题吗?
以下是我的代码:
CREATE OR REPLACE type dynamic_pipeline as object
(
atype anytype,
static function ODCITableDescribe
(
rtype out anytype,
sql_cur in out SYS_REFCURSOR
)
return number,
static function ODCITablePrepare
(
sctx out dynamic_pipeline,
tf_info in sys.ODCITabfuncinfo,
sql_cur in out SYS_REFCURSOR
)
return number,
static function ODCITableStart
(
sctx in out dynamic_pipeline,
sql_cur in out SYS_REFCURSOR
)
return number,
member function ODCITablefetch
(
self in out dynamic_pipeline,
nrows in number,
rws out anydataset
)
return number,
member function ODCITableClose
(
self in dynamic_pipeline
)
return number
);
CREATE OR REPLACE package ICMDB_OAA.pkg_dynamic_pipeline as
/*
* Global Types
*/
-- Describe array.
type dynamic_sql_rec is record
(
cursor integer,
column_cnt pls_integer,
description dbms_sql.desc_tab2,
execute integer
);
type anytype_metadata_rec is record
(
precision pls_integer,
scale pls_integer,
length pls_integer,
csid pls_integer,
csfrm pls_integer,
schema varchar2(30),
type anytype,
name varchar2(30),
version varchar2(30),
attr_cnt pls_integer,
attr_type anytype,
attr_name varchar2(128),
typecode pls_integer
);
/*
* Global Variables
*/
-- SQL descriptor.
r_sql dynamic_sql_rec;
/*
* function will run the given SQL
*/
/*
function querydb(p_stmt in varchar2)
return anydataset pipelined using dyn_pipeline;
*/
function querydb(p_cursor in sys_refcursor)
return anydataset pipelined using dynamic_pipeline;
end pkg_dynamic_pipeline;
/
CREATE OR REPLACE type body dynamic_pipeline
as
/*
* DESC step. this will be called at hard parse and will create
* a physical type in the DB Schema based on the select columns.
*/
static function ODCITableDescribe
(
rtype out anytype,
sql_cur in out SYS_REFCURSOR
)
return number
is
/* Variables */
-- Type to hold the dbms_sql info (description)
r_sql pkg_dynamic_pipeline.dynamic_sql_rec;
-- Type to create (has all the columns) of the sql query.
t_anyt anytype;
-- SQL query that will be made up from the 2 passed in queries.
-- v_sql varchar2(32767);
-- cur_local sys_refcursor;
begin
DBMS_OUTPUT.PUT_LINE('ODCITableDescribe');
/*
* Parse the SQL and describe its format and structure.
*/
-- v_sql := replace(stmt, ';', null);
-- open, parse and discover all info about this SQL.
-- r_sql.cursor := dbms_sql.open_cursor;
-- dbms_sql.parse( r_sql.cursor, v_sql, dbms_sql.native );
r_sql.cursor := DBMS_SQL.TO_CURSOR_NUMBER(sql_cur);
dbms_sql.describe_columns2( r_sql.cursor, r_sql.column_cnt, r_sql.description );
--dbms_sql.close_cursor( r_sql.cursor );
-- Start to create the physical type.
anytype.BeginCreate( DBMS_TYPES.TYPECODE_OBJECT, t_anyt );
-- Loop through each attribute and add to the type.
for i in 1 .. r_sql.column_cnt
loop
t_anyt.AddAttr(r_sql.description(i).col_name,
case
when r_sql.description(i).col_type in (1,96,11,208)
then dbms_types.typecode_varchar2
when r_sql.description(i).col_type = 2
then dbms_types.typecode_number
when r_sql.description(i).col_type in (8,112)
then dbms_types.typecode_clob
when r_sql.description(i).col_type = 12
then dbms_types.typecode_date
when r_sql.description(i).col_type = 23
then dbms_types.typecode_raw
when r_sql.description(i).col_type = 180
then dbms_types.typecode_timestamp
when r_sql.description(i).col_type = 181
then dbms_types.typecode_timestamp_tz
when r_sql.description(i).col_type = 182
then dbms_types.typecode_interval_ym
when r_sql.description(i).col_type = 183
then dbms_types.typecode_interval_ds
when r_sql.description(i).col_type = 231
then dbms_types.typecode_timestamp_ltz
end,
r_sql.description(i).col_precision,
r_sql.description(i).col_scale,
r_sql.description(i).col_max_len,
r_sql.description(i).col_charsetid,
r_sql.description(i).col_charsetform );
end loop;
t_anyt.EndCreate;
-- set the output type to our built type.
ANYTYPE.BeginCreate(dbms_types.TYPECODE_TABLE, rtype);
rtype.SetInfo(null, null, null, null, null, t_anyt,
dbms_types.TYPECODE_OBJECT, 0);
rtype.EndCreate();
sql_cur:= DBMS_SQL.TO_REFCURSOR(r_sql.cursor);
DBMS_OUTPUT.PUT_LINE('ODCITableDescribe End');
return ODCIConst.Success;
end ODCITableDescribe;
/*
* PREPARE step. Initialise our type.
*/
static function ODCITablePrepare
(
sctx out dynamic_pipeline,
tf_info in sys.ODCITabfuncinfo,
sql_cur in out SYS_REFCURSOR
)
return number
is
/* Variables */
-- Meta data.
r_meta pkg_dynamic_pipeline.anytype_metadata_rec;
begin
DBMS_OUTPUT.PUT_LINE('ODCITablePrepare');
r_meta.typecode := tf_info.rettype.getattreleminfo(
1, r_meta.precision, r_meta.scale, r_meta.length,
r_meta.csid, r_meta.csfrm, r_meta.type, r_meta.name
);
sctx := dynamic_pipeline(r_meta.type);
DBMS_OUTPUT.PUT_LINE('ODCITablePrepare End');
return odciconst.success;
end;
/*
* START step. this is where we execute the cursor prior to fetching from it.
*/
static function ODCITableStart
(
sctx in out dynamic_pipeline,
sql_cur in out SYS_REFCURSOR
)
return number
is
/* Variables */
r_meta pkg_dynamic_pipeline.anytype_metadata_rec;
--v_sql varchar2(32767);
--cur_local sys_refcursor;
begin
DBMS_OUTPUT.PUT_LINE('ODCITableStart');
--cur_local:=sql_cur;
--v_sql := replace(stmt, ';', null);
--pkg_dynamic_pipeline.r_sql.cursor := dbms_sql.open_cursor;
pkg_dynamic_pipeline.r_sql.cursor := DBMS_SQL.TO_CURSOR_NUMBER(sql_cur);
--dbms_sql.parse(pkg_dynamic_pipeline.r_sql.cursor, v_sql, dbms_sql.native);
dbms_sql.describe_columns2(pkg_dynamic_pipeline.r_sql.cursor,
pkg_dynamic_pipeline.r_sql.column_cnt,
pkg_dynamic_pipeline.r_sql.description);
-- define all the columns found to let Oracle know the datatypes.
DBMS_OUTPUT.PUT_LINE('ODCITableStart. column_cnt['||pkg_dynamic_pipeline.r_sql.column_cnt||']');
for i in 1..pkg_dynamic_pipeline.r_sql.column_cnt
loop
r_meta.typecode := sctx.atype.GetAttrElemInfo(
i, r_meta.precision, r_meta.scale, r_meta.length,
r_meta.csid, r_meta.csfrm, r_meta.type, r_meta.name
);
case r_meta.typecode
when dbms_types.typecode_varchar2
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, '', 32767);
when dbms_types.typecode_number
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as number));
when dbms_types.typecode_date
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as date));
when dbms_types.typecode_raw
then
dbms_sql.define_column_raw(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as raw), r_meta.length);
when dbms_types.typecode_timestamp
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as timestamp));
when dbms_types.typecode_timestamp_tz
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as timestamp with time zone));
when dbms_types.typecode_timestamp_ltz
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as timestamp with local time zone));
when dbms_types.typecode_interval_ym
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as interval year to month));
when dbms_types.typecode_interval_ds
then
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as interval day to second));
when dbms_types.typecode_clob
then
case pkg_dynamic_pipeline.r_sql.description(i).col_type
when 8
then
dbms_sql.define_column_long(pkg_dynamic_pipeline.r_sql.cursor, i);
else
dbms_sql.define_column(pkg_dynamic_pipeline.r_sql.cursor, i, cast(null as clob));
end case;
end case;
end loop;
-- execute the SQL.
pkg_dynamic_pipeline.r_sql.execute := dbms_sql.execute(pkg_dynamic_pipeline.r_sql.cursor);
sql_cur:= DBMS_SQL.TO_REFCURSOR(pkg_dynamic_pipeline.r_sql.cursor);
DBMS_OUTPUT.PUT_LINE('ODCITableStart End');
return odciconst.success;
end ODCITablestart;
/*
* FETCH step.
*/
member function ODCITablefetch
(
self in out dynamic_pipeline,
nrows in number,
rws out anydataset
)
return number
is
/* Variables */
-- Buffers to hold values.
v_vc_col varchar2(32767);
v_num_col number;
v_date_col date;
v_raw_col raw(32767);
v_raw_error number;
v_raw_len integer;
v_int_ds_col interval day to second;
v_int_ym_col interval year to month;
v_ts_col timestamp;
v_tstz_col timestamp with time zone;
v_tsltz_col timestamp with local time zone;
v_clob_col clob;
v_clob_offset integer := 0;
v_clob_len integer;
-- Metadata
r_meta pkg_dynamic_pipeline.anytype_metadata_rec;
begin
DBMS_OUTPUT.PUT_LINE('ODCITablefetch');
if dbms_sql.fetch_rows( pkg_dynamic_pipeline.r_sql.cursor ) > 0
then
-- Describe to get number and types of columns.
r_meta.typecode := self.atype.getinfo(
r_meta.precision, r_meta.scale, r_meta.length,
r_meta.csid, r_meta.csfrm, r_meta.schema,
r_meta.name, r_meta.version, r_meta.attr_cnt
);
anydataset.begincreate(dbms_types.typecode_object, self.atype, rws);
rws.addinstance();
rws.piecewise();
-- loop through each column extracting value.
for i in 1..pkg_dynamic_pipeline.r_sql.column_cnt
loop
r_meta.typecode := self.atype.getattreleminfo(
i, r_meta.precision, r_meta.scale, r_meta.length,
r_meta.csid, r_meta.csfrm, r_meta.attr_type,
r_meta.attr_name
);
case r_meta.typecode
when dbms_types.typecode_varchar2
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_vc_col);
rws.setvarchar2(v_vc_col);
when dbms_types.typecode_number
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_num_col);
rws.setnumber(v_num_col);
when dbms_types.typecode_date
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_date_col);
rws.setdate(v_date_col);
when dbms_types.typecode_raw
then
dbms_sql.column_value_raw(pkg_dynamic_pipeline.r_sql.cursor, i, v_raw_col,
v_raw_error, v_raw_len);
rws.setraw(v_raw_col);
when dbms_types.typecode_interval_ds
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_int_ds_col);
rws.setintervalds(v_int_ds_col);
when dbms_types.typecode_interval_ym
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_int_ym_col);
rws.setintervalym(v_int_ym_col);
when dbms_types.typecode_timestamp
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_ts_col);
rws.settimestamp(v_ts_col);
when dbms_types.typecode_timestamp_tz
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_tstz_col);
rws.settimestamptz(v_tstz_col);
when dbms_types.typecode_timestamp_ltz
then
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_tsltz_col);
rws.settimestampltz(v_tsltz_col);
when dbms_types.typecode_clob
then
case pkg_dynamic_pipeline.r_sql.description(i).col_type
when 8
then
loop
dbms_sql.column_value_long(pkg_dynamic_pipeline.r_sql.cursor, i, 32767, v_clob_offset,
v_vc_col, v_clob_len);
v_clob_col := v_clob_col || v_vc_col;
v_clob_offset := v_clob_offset + 32767;
exit when v_clob_len < 32767;
end loop;
else
dbms_sql.column_value(pkg_dynamic_pipeline.r_sql.cursor, i, v_clob_col);
end case;
rws.setclob(v_clob_col);
end case;
end loop;
rws.endcreate();
end if;
DBMS_OUTPUT.PUT_LINE('ODCITablefetch End');
return ODCIConst.Success;
end;
/*
* CLOSE step. close the cursor.
*/
member function ODCITableClose(self in dynamic_pipeline)
return number
is
begin
DBMS_OUTPUT.PUT_LINE('ODCITableClose');
dbms_sql.close_cursor( pkg_dynamic_pipeline.r_sql.cursor );
pkg_dynamic_pipeline.r_sql := null;
DBMS_OUTPUT.PUT_LINE('ODCITableClose End');
return odciconst.success;
end ODCITableClose;
end;
/