2

我有一个数据库场景(我正在使用 Oracle),其中几个进程将插入到一个表中,并且一个进程从中选择。该表主要用作中间存储,多个进程(以下称为 Writers)向其写入日志事件,而单个进程(以下称为 Reader)则从中读取事件以进行进一步处理。Reader 必须读取插入到表中的所有事件。

目前,这是通过为每个插入的记录分配一个升序的 id 来完成的。读取器周期性地从表中选择一个条目块,其中 id 大于先前读取块的最大 id。例如:

SELECT
  *
FROM
  TRANSACTION_LOG
WHERE
  id > (
    SELECT
      last_id
    FROM
      READER_STATUS
   );

这种方法的问题在于,由于写入器是并发操作的,因此行并不总是根据其分配的 id 按顺序插入,即使这些是按顺序升序分配的。也就是说,有时会在 id=110 的记录之后写入 id=100 的行,因为写入 id=110 的行的过程在写入记录 id=100 的进程之后开始,但首先提交。如果读取器已经读取了 id=110 的行,这可能会导致读取器丢失 id=100 的行。

强制写入者对表进行排他锁将解决问题,因为这将强制它们按顺序插入,并且还会让读取器等待任何未完成的提交。然而,这可能不会很快。

我的想法是,读者在阅读之前等待任何未完成的 Writer 提交就足够了。也就是说,只要 Reader 确实读取,Writers 就可以继续并发操作,直到所有 writers 完成。

我的问题是:
如何指示我的读者进程等待我的作家进程的任何未完成的提交?也欢迎对上述问题提出任何替代建议。

4

5 回答 5

1

有趣的问题。听起来您正在构建一个不错的解决方案。
我希望我能帮上忙。

几个建议...

作家状态

您可以创建一个表 WRITER_STATUS,它有一个 last_id 字段:每个写入者在使用要写入日志的 ID 写入之前更新此表,但前提是其 ID 大于 last_id 的当前值。

读者还检查了这个表,现在知道是否有任何作家还没有写过。

读者日志

这可能更有效。
阅读器进行阅读后,它会检查检索到的记录中是否存在任何漏洞。
然后它将所有丢失的 ID 记录到 MISSING_IDS 表中,并在下一次读取时执行类似的操作

SELECT *
FROM   TRANSACTION_LOG
WHERE  id > (SELECT last_id
             FROM   READER_STATUS)
OR     id IN ( SELECT id from MISSING_IDS ) 
于 2008-09-30T09:31:41.820 回答
1

您可能希望在读取器进程中对表设置排他锁。这将等到所有写入程序完成并释放他们的行锁,因此您可以确定没有未完成的写入程序事务。

于 2008-09-30T09:45:57.900 回答
1

我不会做任何锁定,这会干扰并发性和吞吐量。

如果您逐行跟踪已处理的日志行,则您也不需要 Reader_Status 表。

这就是我要做的:在您的日志表中添加一个新列。例如,将其称为“已处理”。使其成为布尔值,默认为 false(或小整数,默认为 0 或其他)。写入器在插入时使用默认值。

当 Reader 查询下一个要处理的记录块时,他会查询已处理为 false 且 id 值较低的行。

SELECT * FROM Transaction_Log
WHERE processed = 0
ORDER BY id
LIMIT 10;

在处理它们时,Reader 使用 UPDATE 将处理的从 false 更改为 true。所以下次 Reader 查询一个记录块时,他确信他不会得到他已经处理过的行。

UPDATE Transaction_Log
SET processed = 1
WHERE id = ?;  -- do this for each row processed

此 UPDATE 不应与 Writer 执行的 INSERT 操作冲突。

如果任何行被其他 Writer 不按顺序提交,Reader 将在下次查询时看到它们,如果他总是按照 id 列从最低值到最高值的顺序处理它们。

于 2008-09-30T17:16:22.233 回答
0

我同意 AJ 的解决方案(链接)。此外,以下建议可能有助于减少孔的数量。

1)Oracle Sequence用于创建id并使用auto-increment如下

INSERT INTO transaction_table VALUES(id__seq.nextval, <other columns>);

2)使用autoCommit(true)以便插入将立即提交。

这两个步骤将大大减少孔的数量。仍然有可能一些插入首先开始但后来提交,并且在两者之间发生了读取操作。

于 2008-10-01T03:46:52.263 回答
0

由于您知道last_id由 Reader 处理,您可以通过以下方式请求下一个工作项:

select * from Transaction_log where id = (
  select last_id + 1 /* or whatever increment your sequencer has */
    from Reader_status)
于 2008-09-30T18:46:29.600 回答