这是一个有效的简单示例。也许您可以使用它来确定您的问题?
-- DROP TYPE some_type_t FORCE;
CREATE OR REPLACE TYPE some_type_t AS OBJECT (
a NUMBER,
b VARCHAR(100),
c DATE
);
CREATE OR REPLACE PROCEDURE some_type_callback(CONTEXT IN RAW,
reginfo IN sys.aq$_reg_info,
descr IN sys.aq$_descriptor,
payload IN RAW,
payloadl IN NUMBER) AS
-- Local variables
v_dequeue_options dbms_aq.dequeue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle RAW(26);
v_some_type some_type_t;
BEGIN
-- Set the dequeue options from the descriptor
v_dequeue_options.consumer_name := descr.consumer_name;
v_dequeue_options.msgid := descr.msg_id;
-- Dequeue the message
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => v_dequeue_options,
message_properties => v_message_properties,
payload => v_some_type,
msgid => v_message_handle);
END some_type_callback;
/
SELECT *
FROM user_errors e
WHERE e.name = 'SOME_TYPE_CALLBACK';
BEGIN
-- dbms_aqadm.drop_queue_table(queue_table => 'some_type_qt',
-- force => TRUE);
dbms_aqadm.create_queue_table(queue_table => 'some_type_qt',
queue_payload_type => 'some_type_t',
multiple_consumers => TRUE);
dbms_aqadm.create_queue(queue_name => 'some_type_q',
queue_table => 'some_type_qt',
retention_time => 86400); -- 1 day
dbms_aqadm.start_queue(queue_name => 'some_type_q');
dbms_aqadm.add_subscriber(queue_name => 'some_type_q',
subscriber => sys.aq$_agent(NAME => 'some_type_qs',
address => NULL,
protocol => NULL));
dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('some_type_q:some_type_qs',
dbms_aq.namespace_aq,
'plsql://some_type_callback',
hextoraw('FF'))),
1);
END;
/
SELECT *
FROM aq$some_type_qt;
-- nothing
DECLARE
v_some_type some_type_t;
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
enq_msgid RAW(16);
BEGIN
v_some_type := some_type_t(a => 42,
b => 'forty-two',
c => to_date('1/1/2942',
'mm/dd/yyyy'));
dbms_aq.enqueue(queue_name => 'some_type_q',
enqueue_options => eopt,
message_properties => mprop,
payload => v_some_type,
msgid => enq_msgid);
END;
/
SELECT *
FROM aq$some_type_qt;
-- msg_state = READY => PROCESSED
肯定会帮助您的一件事是在您的队列表上设置保留时间。然后,您可以使用队列表的 aq$ 引用来查看消息。msg_state 列将显示 READY 表示新消息,而 PROCESSED 表示已使用的消息。那里还有一些其他列可能会有所帮助:例如 retry_count。
如果您收到 ORA-25263,似乎不是处理您在回调中被激怒的一条消息,而是您试图读取另一条消息并与另一个开始使用队列的作业发生冲突。这可以通过我在调用出队之前的回调中的两行来解决。
如果您需要在消息到达时触发然后保留消息顺序,则需要为回调添加一些锁定和额外的复杂性。您可以查看 Metalink 225810.1 以获取有关如何执行此操作的示例。