这是我将工作并行化为 N 个单独的作业的自定义方法,保留了 dbms_scheduler 的日志记录和背压支持。日期间隔由 mod N 传播。
create table message_fixup_log (
source_date date not null,
started_at timestamp(6) not null,
finished_at timestamp(6),
fixed_message_count number(10)
);
alter table message_fixup_log add duration as (finished_at - started_at);
create unique index ix_message_fixup_log_date on message_fixup_log(source_date desc);
create or replace procedure message_fixup(jobNumber number, jobCount number, jobName varchar default null)
is
minSince date;
maxSince date;
since date;
msgUpdatedCount number;
begin
-- choose interval
select trunc(min(ts)) into minSince from message_part;
select trunc(max(ts))+1 into maxSince from message_part;
begin
select max(source_date) + jobCount into since from message_fixup_log
where finished_at is not null
and mod(source_date - minSince, jobCount) = jobNumber
and source_date >= minSince;
exception when no_data_found then null;
end;
if (since is null) then
since := minSince + jobNumber;
end if;
if (since >= maxSince) then
if (jobName is not null) then
dbms_scheduler.set_attribute(jobName, 'end_date', systimestamp + interval '1' second);
end if;
return;
end if;
insert into message_fixup_log(source_date, started_at) values(since, systimestamp);
-- perform some actual work for chosen interval
msgUpdatedCount := sql%rowcount;
update message_fixup_log
set fixed_message_count = msgUpdatedCount, finished_at = systimestamp
where source_date = since;
end;
-- manual test
--call message_fixup(0, 1);
declare
jobName varchar2(256);
jobCount number default 8;
begin
for jobNumber in 0..(jobCount-1) loop
jobName := 'message_fixup_job' || jobNumber;
begin
dbms_scheduler.drop_job(jobName, true);
exception
when others then null;
end;
dbms_scheduler.create_job(
job_name => jobName,
job_type => 'stored_procedure',
job_action => 'message_fixup',
enabled => false,
start_date => systimestamp,
repeat_interval => 'freq = minutely; interval = 1',
number_of_arguments => 3
);
dbms_scheduler.set_attribute(jobName, 'logging_level', dbms_scheduler.logging_full);
dbms_scheduler.set_job_anydata_value(jobName, 1, ANYDATA.ConvertNumber(jobNumber));
dbms_scheduler.set_job_anydata_value(jobName, 2, ANYDATA.ConvertNumber(jobCount));
dbms_scheduler.set_job_argument_value(jobName, 3, jobName);
dbms_scheduler.enable(jobName);
end loop;
end;