4

假设有两个表TST_SAMPLE (10000 rows)TST_SAMPLE_STATUS (empty)

我想遍历每条记录TST_SAMPLE并相应地添加一条记录TST_SAMPLE_STATUS

在一个简单的线程中:

begin
  for r in (select * from TST_SAMPLE)
  loop 

    insert into TST_SAMPLE_STATUS(rec_id, rec_status)
    values (r.rec_id, 'TOUCHED');

  end loop;

  commit;
end;
/

在多线程解决方案中有一种情况,我不清楚。那么你能解释一下是什么原因导致处理一行TST_SAMPLE多次。

请参阅下面的详细信息。

create table TST_SAMPLE(
  rec_id       number(10) primary key 
);

create table TST_SAMPLE_STATUS(
  rec_id       number(10),
  rec_status   varchar2(10),
  session_id   varchar2(100)
);


begin
  insert into TST_SAMPLE(rec_id)
  select LEVEL from dual connect by LEVEL <= 10000;

  commit;
end;
/


CREATE OR REPLACE PROCEDURE tst_touch_recs(pi_limit int) is
  v_last_iter_count int;
begin

   loop

     v_last_iter_count := 0;

     --------------------------
     for r in (select *
                 from TST_SAMPLE A
                where rownum < pi_limit
                  and NOT EXISTS (select null
                                    from TST_SAMPLE_STATUS B
                                   where B.rec_id = A.rec_id)
                  FOR UPDATE SKIP LOCKED)
     loop

        insert into TST_SAMPLE_STATUS(rec_id, rec_status, session_id)
        values (r.rec_id, 'TOUCHED', SYS_CONTEXT('USERENV', 'SID'));

        v_last_iter_count := v_last_iter_count + 1;
     end loop;

     commit;
     --------------------------

     exit when v_last_iter_count = 0;

   end loop;
end;
/

FOR-LOOP我尝试迭代以下行: - 没有状态(NOT EXISTS 子句) - 当前未锁定在另一个线程中(FOR UPDATE SKIP LOCKED)

对于迭代中的确切行数没有要求。这里pi_limit只是一个批次的最大尺寸。唯一需要的是TST_SAMPLE在一个会话中处理每一行。

所以让我们在 3 个线程中运行这个过程。

declare
 v_job_id number;
begin

  dbms_job.submit(v_job_id, 'begin tst_touch_recs(100); end;', sysdate);
  dbms_job.submit(v_job_id, 'begin tst_touch_recs(100); end;', sysdate);
  dbms_job.submit(v_job_id, 'begin tst_touch_recs(100); end;', sysdate);

  commit;
end;

出乎意料的是,我们看到在几个会话中处理了一些行

select count(unique rec_id) AS unique_count,
       count(rec_id)        AS total_count
  from TST_SAMPLE_STATUS;


| unique_count | total_count |
------------------------------
|        10000 |       17397 |
------------------------------


-- run to see duplicates
select * 
  from TST_SAMPLE_STATUS 
 where REC_ID in (
                    select REC_ID 
                      from TST_SAMPLE_STATUS
                     group by REC_ID
                    having count(*) > 1
                 )
 order by REC_ID;

请帮助识别程序实施中的错误tst_touch_recs

4

1 回答 1

3

这是一个小例子,说明了为什么要读取行两次。

在两个会话中运行以下代码,在第一个会话后几秒钟开始第二个:

declare
  cursor c is 
    select a.*
     from TST_SAMPLE A
    where rownum < 10
      and NOT EXISTS (select null
                        from TST_SAMPLE_STATUS B
                       where B.rec_id = A.rec_id)
      FOR UPDATE SKIP LOCKED;

  type rec is table of c%rowtype index by pls_integer;
  rws rec;
begin
  open c; -- data are read consistent to this time

  dbms_lock.sleep ( 10 );

  fetch c 
  bulk  collect 
  into  rws;

  for i in 1 .. rws.count loop
    dbms_output.put_line ( rws(i).rec_id );
  end loop;

  commit;

end;
/

您应该看到两个会话都显示相同的行。

为什么?

因为 Oracle 数据库具有语句级一致性,所以当您打开游标时,两者的结果集都会被冻结。

但是,当您有 SKIP LOCKED 时,FOR UPDATE 锁定仅在您获取 rows 时启动。

因此会话 1 开始并找到不在 TST_SAMPLE_STATUS 中的前 9 行。然后等待 10 秒。

如果您在这 10 秒内启动会话 2,则光标将查找相同的 9 行

此时没有行被锁定。

现在,这就是有趣的地方。

第一个会话的睡眠将结束。然后它将获取行,锁定它们并跳过任何已经锁定的行。

不久之后,它就会提交。释放锁

片刻之后,会话 2 开始读取这些行。此时锁定

所以没有什么可以跳过的。

你如何解决这个问题取决于你想要做什么。

假设您不能转向基于集合的方法,您可以通过添加使事务可序列化:

set transaction isolation level serializable;

在光标循环之前。然后这将转移到事务级别的一致性。使数据库能够在获取行时检测“某些已更改”。

但是您需要ORA-08177: can't serialize access for this transaction在外部循环中捕获错误。或者任何重新读取相同行的进程都会在此时退出。

或者,正如评论者所建议的那样,使用高级排队。

于 2019-02-26T17:19:27.587 回答