我正在处理从一个数据库到另一个数据库的 AQ 传播,但是当我安排传播并将第一条消息排队到 LOCAL AQ 表时,我在 DBA_QUEUE_SCHEDULES.LAST_ERROR_MSG 中遇到错误,“ORA-25215:user_data 类型和队列类型不匹配”。请注意,AQ 表中使用的两种对象类型是相同的,出于测试目的,我使用的是这个:
create or replace type LOCAL_OBJ_MSG as object(
test varchar2(4000))
/
并且两个 AQ 表也是相同的,我使用相同的脚本创建它们并且只更改了名称,一个 LOCAL 另一个 REMOTE。LOCAL AQ 表在 LOCAL 数据库中,REMOTE 表在 SCHEMA_NAME.REMOTE 数据库中。
这是我用来创建 AQ 表的脚本:
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.stop_queue(
queue_name => 'REMOTE_iTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.drop_queue('REMOTE_iTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
dbms_aqadm.drop_queue_table('REMOTE_iTEST', force => true);
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.stop_queue(
queue_name => 'REMOTE_oTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.drop_queue('REMOTE_oTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
dbms_aqadm.drop_queue_table('REMOTE_oTEST', force => true);
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
l_exist number;
cursor c_index is
select 1 from user_objects u where u.OBJECT_NAME = upper('LOCAL_OBJ_MSG') and u.OBJECT_TYPE = 'TYPE';
begin
open c_index;
fetch c_index into l_exist;
close c_index;
if l_exist = 1 then
execute immediate 'drop type LOCAL_OBJ_MSG';
end if;
end;
/
create or replace type LOCAL_OBJ_MSG as object(
test varchar2(4000))
/
begin
dbms_aqadm.create_queue_table (
queue_table => 'REMOTE_iTEST',
queue_payload_type => 'LOCAL_OBJ_MSG',
storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
message_grouping => DBMS_AQADM.NONE,
sort_list => 'ENQ_TIME',
multiple_consumers => true,
comment => 'Incoming TEST message table.');
end;
/
begin
dbms_aqadm.create_queue (
queue_table => 'REMOTE_iTEST',
queue_name => 'REMOTE_iTEST',
queue_type => sys.dbms_aqadm.normal_queue,
retention_time => sys.dbms_aqadm.INFINITE,
comment => 'Incoming TEST messages.',
max_retries => 5);
end;
/
begin
dbms_aqadm.start_queue(
queue_name => 'REMOTE_iTEST',
dequeue => true,
enqueue => true);
end;
/
begin
dbms_aqadm.create_queue_table (
queue_table => 'REMOTE_oTEST',
queue_payload_type => 'LOCAL_OBJ_MSG',
storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
message_grouping => DBMS_AQADM.NONE,
sort_list => 'ENQ_TIME',
multiple_consumers => true,
comment => 'Outgoing TEST message table.');
end;
/
begin
dbms_aqadm.create_queue (
queue_table => 'REMOTE_oTEST',
queue_name => 'REMOTE_oTEST',
queue_type => sys.dbms_aqadm.normal_queue,
retention_time => sys.dbms_aqadm.INFINITE,
comment => 'Outgoing TEST messages.',
max_retries => 5);
end;
/
begin
dbms_aqadm.start_queue(
queue_name => 'REMOTE_oTEST',
dequeue => TRUE,
enqueue => TRUE);
end;
/
以下是用于创建订阅者、出列程序等的脚本:
本地数据库:
begin
-- Add the remote subscriber.
dbms_aqadm.add_subscriber(queue_name => 'LOCAL_oTEST',
subscriber => sys.aq$_agent(name => 'LOCAL_oTEST_subscriber',
address => 'SCHEMA_NAME.REMOTE_oTEST@DB_LINK_NAME',
protocol => 0),
queue_to_queue => true);
-- Start the propagation of messages.
dbms_aqadm.schedule_propagation(queue_name => 'LOCAL_oTEST',
latency => 0,
destination => 'DB_LINK_NAME',
destination_queue => 'SCHEMA_NAME.REMOTE_oTEST');
end;
/
远程数据库:
-- Create a table to store the messages received.
create table sepa_omsg_aq_demo
(received timestamp default systimestamp,
message LOCAL_OBJ_MSG);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure REMOTE_CALLBACK_TEST
(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
)
as
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle raw(26);
o_payload LOCAL_OBJ_MSG;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
insert into sepa_omsg_aq_demo
(message)
values (o_payload);
commit;
exception
when others then
rollback;
end;
/
-- Register the procedure for dequeuing the messages received.
-- I'd like to point out that the subscriber is the one defined for the local database
begin
dbms_aq.register (
sys.aq$_reg_info_list(
sys.aq$_reg_info('REMOTE_oTEST:LOCAL_oTEST_subscriber',
dbms_aq.namespace_aq,
'plsql://REMOTE_CALLBACK_TEST',
hextoraw('FF'))
),
1);
end;
/
将消息排队到 LOCAL AQ 表的脚本:
declare
enq_msgid raw(16);
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
message local_obj_msg;
begin
message := local_obj_msg('a');
dbms_aq.enqueue(queue_name => 'LOCAL_oTEST',
enqueue_options => eopt,
message_properties => mprop,
payload => message,
msgid => enq_msgid);
commit;
end;
/
另请注意,在 SYS.AQ$_MESSAGE_TYPES 表(LOCAL db)中,已验证状态 = 'F' 用于创建传播,并且:
declare
rc binary_integer;
begin
dbms_aqadm.verify_queue_types(src_queue_name => 'local_otest',
dest_queue_name => 'schema_name.remote_otest',
rc => rc,
destination => 'db_link_name');
dbms_output.put_line('Compatible: ' || rc);
end;
/
返回 0,这意味着表类型不兼容。在我进行了一些挖掘之后,我发现如果 LOCAL 和 REMOTE 数据库 NLS_LENGTH_SEMANTICS 不同,则无法进行远程传播,但是在这种情况下并非如此。我已经检查过了。任何想法是我做错了什么,或者我怎样才能找到这两个表之间的不同之处以及如何解决它?或者它可能是某些数据库参数值之间的差异?
Oracle 数据库 11g 版本 11.2.0.3.0