0

环境:Oracle 12c

我目前有一个 Oracle Advanced Queue 系统设置如下:

NAME                  QUEUE_TABLE     QID       QUEUE_TYPE      MAX_RETRIES  RETRY_DELAY  RETENTION
---------------       --------------- -------   --------------- -----------  -----------  ---------
MY_WORK_Q             MY_WORK_QT      2518333   NORMAL_QUEUE    100          0            0 
MY_WORK_QT_E          MY_WORK_QT      2518332   EXCEPTION_QUEUE 0            0            0

我还注册了一个调用 PL/SQL 包过程的回调。

出于某种原因,我似乎遇到了丢失消息或消息未出队的情况。

基于此,我有以下问题:

  1. 我是否正确设置了我的实际队列 - 带有 MAX_RETRIES 和其他信息的 MY_WORK_Q?
  2. 有没有办法检查已排队的消息?
  3. 有没有办法检查已经出队的消息?
  4. 有没有办法检查 EXCEPTION_QUEUE/Exception 表以查看丢失的消息是否已到达那里?

我只是不明白为什么我在排队系统中丢失了消息,以及我可以检查什么以查看可能导致问题的原因。

我还将我的 MAX_RETRIES 增加到 100,但似乎仍然存在问题。

更新

好像我得到了error ORA-25263: no message in queue.

我不确定这是否与 dbms_aq.enqueue 或 dbms_aq.dequeue 调用的时间问题有关,但在 dbms_aq.dequeue 我有这个设置:

l_dequeue_options.wait := dbms_aq.no_wait;

我需要说 10 秒的等待而不是 no_wait 吗?我不确定这是否可能以及是否需要在入队或出队步骤上等待。

4

1 回答 1

0

这是一个有效的简单示例。也许您可以使用它来确定您的问题?

-- DROP TYPE some_type_t FORCE;

CREATE OR REPLACE TYPE some_type_t AS OBJECT (
  a NUMBER,
  b VARCHAR(100),
  c DATE
);

CREATE OR REPLACE PROCEDURE some_type_callback(CONTEXT  IN RAW,
                                               reginfo  IN sys.aq$_reg_info,
                                               descr    IN sys.aq$_descriptor,
                                               payload  IN RAW,
                                               payloadl IN NUMBER) AS

  -- Local variables
  v_dequeue_options    dbms_aq.dequeue_options_t;
  v_message_properties dbms_aq.message_properties_t;
  v_message_handle     RAW(26);
  v_some_type          some_type_t;
BEGIN

  -- Set the dequeue options from the descriptor
  v_dequeue_options.consumer_name := descr.consumer_name;
  v_dequeue_options.msgid         := descr.msg_id;

  -- Dequeue the message
  dbms_aq.dequeue(queue_name         => descr.queue_name,
                  dequeue_options    => v_dequeue_options,
                  message_properties => v_message_properties,
                  payload            => v_some_type,
                  msgid              => v_message_handle);
END some_type_callback;
/

SELECT *
  FROM user_errors e
 WHERE e.name = 'SOME_TYPE_CALLBACK';

BEGIN
 -- dbms_aqadm.drop_queue_table(queue_table => 'some_type_qt',
 --                             force       => TRUE);

  dbms_aqadm.create_queue_table(queue_table        => 'some_type_qt',
                                queue_payload_type => 'some_type_t',
                                multiple_consumers => TRUE);

  dbms_aqadm.create_queue(queue_name     => 'some_type_q',
                          queue_table    => 'some_type_qt',
                          retention_time => 86400); -- 1 day

  dbms_aqadm.start_queue(queue_name => 'some_type_q');

  dbms_aqadm.add_subscriber(queue_name => 'some_type_q',
                            subscriber => sys.aq$_agent(NAME     => 'some_type_qs',
                                                        address  => NULL,
                                                        protocol => NULL));

  dbms_aq.register(sys.aq$_reg_info_list(sys.aq$_reg_info('some_type_q:some_type_qs',
                                                          dbms_aq.namespace_aq,
                                                          'plsql://some_type_callback',
                                                          hextoraw('FF'))),
                   1);
END;
/

SELECT *
  FROM aq$some_type_qt;
-- nothing

DECLARE
  v_some_type some_type_t;
  eopt        dbms_aq.enqueue_options_t;
  mprop       dbms_aq.message_properties_t;
  enq_msgid   RAW(16);
BEGIN
  v_some_type := some_type_t(a => 42,
                             b => 'forty-two',
                             c => to_date('1/1/2942',
                                          'mm/dd/yyyy'));

  dbms_aq.enqueue(queue_name         => 'some_type_q',
                  enqueue_options    => eopt,
                  message_properties => mprop,
                  payload            => v_some_type,
                  msgid              => enq_msgid);
END;
/

SELECT *
  FROM aq$some_type_qt;
-- msg_state = READY => PROCESSED

肯定会帮助您的一件事是在您的队列表上设置保留时间。然后,您可以使用队列表的 aq$ 引用来查看消息。msg_state 列将显示 READY 表示新消息,而 PROCESSED 表示已使用的消息。那里还有一些其他列可能会有所帮助:例如 retry_count。

如果您收到 ORA-25263,似乎不是处理您在回调中被激怒的一条消息,而是您试图读取另一条消息并与另一个开始使用队列的作业发生冲突。这可以通过我在调用出队之前的回调中的两行来解决。

如果您需要在消息到达时触发然后保留消息顺序,则需要为回调添加一些锁定和额外的复杂性。您可以查看 Metalink 225810.1 以获取有关如何执行此操作的示例。

于 2020-05-22T15:25:36.407 回答