5

I have a table, to which rows are only appended (not updated or deleted) with transactions (I'll explain why this is important), and I need to fetch the new, previously unfetched, rows of this table, every minute with a cron.

How am I going to do this? In any programming language (I use Perl but that's irrelevant.)

I list the ways I thought of how to solve this problem, and ask you to show me the correct one (there HAS to be one...)

The first way that popped to my head was to save (in a file) the largest auto_incrementing id of the rows fetched, so in the next minute I can fetch with: WHERE id > $last_id. But that can miss rows. Because new rows are inserted in transactions, it's possible that the transaction that saves the row with id = 5 commits before the transaction that saves the row with id = 4. It's therefore possible that the cron script retrieves row 5 but not row 4, and when row 4 gets committed one split second later, it will never gets fetched (because 4 is not > than 5 which is the $last_id).

Then I thought I could make the cron job fetch all rows that have a date field in the last TWO minutes, check which of these rows have been retrieved again in the previous run of the cron job (to do this I would need to save somewhere which row ids were retrieved), compare, and process only the new ones. Unfortunately this is complicated, and also doesn't solve the problem that will occur if a certain inserting transaction takes TWO AND A HALF minutes to commit for some weird database reason, which will cause the date to be too old for the next iteration of the cron job to fetch.

Then I thought of installing a message queue (MQ) like RabbitMQ or any other. The same process that does the inserting transaction, would notify RabbitMQ of the new row, and RabbitMQ would then notify an always-running process that processes new rows. So instead of getting a batch of rows inserted in the last minute, that process would get the new rows one-by-one as they are written. This sounds good, but has too many points of failure - RabbitMQ might be down for a second (in a restart for example) and in that case the insert transaction will have committed without the receiving process having ever received the new row. So the new row will be missed. Not good.

I just thought of one more solution: the receiving processes (there's 30 of them, doing the exact same job on exactly the same data, so the same rows get processed 30 times, once by each receiving process) could write in another table that they have processed row X when they process it, then when time comes they can ask for all rows in the main table that don't exist in the "have_processed" table with an OUTER JOIN query. But I believe (correct me if I'm wrong) that such a query will consume a lot of CPU and HD on the DB server, since it will have to compare the entire list of ids of the two tables to find new entries (and the table is huge and getting bigger each minute). It would have been fast if the receiving process was only one - then I would have been able to add a indexed field named "have_read" in the main table that would make looking for new rows extremely fast and easy on the DB server.

What is the right way to do it? What do you suggest? The question is simple, but a solution seems hard (for me) to find.

Thank you.

4

5 回答 5

2

I believe the 'best' way to do this would be to use one process that checks for new rows and delegates them to the thirty consumer processes. Then your problem becomes simpler to manage from a database perspective and a delegating process is not that difficult to write.

If you are stuck with communicating to the thirty consumer processes through the database, the best option I could come up with is to create a trigger on the table, which copies each row to a secondary table. Copy each row to the secondary table thirty times (once for each consumer process). Add a column to this secondary table indicating the 'target' consumer process (for example a number from 1 to 30). Each consumer process checks for new rows with its unique number and then deletes those. If you are worried that some rows are deleted before they are processed (because the consumer crashes in the middle of processing), you can fetch, process and delete them one by one.

Since the secondary table is kept small by continuously deleting processed rows, INSERTs, SELECTs and DELETEs would be very fast. All operations on this secondary table would also be indexed by the primary key (if you place the consumer ID as first field of the primary key).

In MySQL statements, this would look like this:

CREATE TABLE `consumer`(
    `id` INTEGER NOT NULL,
    PRIMARY KEY (`id`)
);
INSERT INTO `consumer`(`id`) VALUES
(1),
(2),
(3)
-- all the way to 30
;

CREATE TABLE `secondaryTable` LIKE `primaryTable`;
ALTER TABLE `secondaryTable` ADD COLUMN `targetConsumerId` INTEGER NOT NULL FIRST;
-- alter the secondary table further to allow several rows with the same primary key (by adding targetConsumerId to the primary key)

DELIMTER //
CREATE TRIGGER `mark_to_process` AFTER INSERT ON `primaryTable`
FOR EACH ROW
BEGIN
    -- by doing a cross join with the consumer table, this automatically inserts the correct amount of rows and adding or deleting consumers is just a matter of adding or deleting rows in the consumer table
    INSERT INTO `secondaryTable`(`targetConsumerId`, `primaryTableId`, `primaryTableField1`, `primaryTableField2`) SELECT `consumer`.`id`, `primaryTable`.`id`, `primaryTable`.`field1`, `primaryTable`.`field2` FROM `consumer`, `primaryTable` WHERE `primaryTable`.`id` = NEW.`id`;
END//
DELIMITER ;

-- loop over the following statements in each consumer until the SELECT doesn't return any more rows
START TRANSACTION;
SELECT * FROM secondaryTable WHERE targetConsumerId = MY_UNIQUE_CONSUMER_ID LIMIT 1;
-- here, do the processing (so before the COMMIT so that crashes won't let you miss rows)
DELETE FROM secondaryTable WHERE targetConsumerId = MY_UNIQUE_CONSUMER_ID AND primaryTableId = PRIMARY_TABLE_ID_OF_ROW_JUST_SELECTED;
COMMIT;
于 2013-09-06T00:16:21.463 回答
1

我一直在考虑这个问题。所以,让我看看我是否做对了。您有一个巨大的表,其中 N,数量可能随时间变化,进程写入(我们称它们为生产者)。现在,有这些 M,数量随时间变化,其他进程至少需要在添加每条记录后处理(我们称它们为消费者)。

检测到的主要问题有:

  • 确保解决方案适用于动态 N 和 M
  • 需要跟踪每个消费者的未处理记录
  • 由于记录量巨大,解决方案必须尽可能升级

为了解决这些问题,我想到了这个。创建此表(PK 粗体):

  • PENDING_RECORDS( ConsumerID , HugeTableID )

修改消费者,以便每次他们向 HUGE_TABLE 添加一条记录时,他们也会向 PENDING_RECORDS 表添加 M 条记录,以便它具有 HugeTableID 以及当时存在的每个 ConsumerID。每次消费者运行时,它都会查询 PENDING_RECORDS 表并为自己找到少量匹配项。然后它将与 HUGE_TABLE 进行连接(注意它将是内部连接,而不是左连接)并获取它需要处理的实际数据。处理完数据后,消费者将删除从 PENDING_RECORDS 表中获取的记录,使其保持较小。

于 2013-09-06T00:21:17.200 回答
0

有趣,我必须说:)

1)首先 - 是否可以向只添加行的表添加一个字段(我们称之为“事务表”)?我的意思是,它是一种设计范式,你有理由不对这张表进行任何形式的更新,还是它“结构上”被阻止(即连接到 db 的用户没有权限在这张表上执行更新)?

因为最简单的方法是使用默认 0 将“have_read”列添加到该表中,并在获取的行上用 1 更新该列(即使 30 个进程同时执行此操作,你应该没问题,因为它会非常快并且它不会损坏您的数据)。即使 30 个进程将相同的 1000 行标记为已提取 - 也没有任何损坏。尽管如果您不在 InnoDB 上操作,就性能而言,这可能不是最好的方法(MyISAM 在更新时锁定整个表,InnoDB 只锁定更新的行)。

2)如果这不是您可以使用的 - 我肯定会检查您作为最后一个解决方案提供的解决方案,并稍作修改。创建一个表(假设:fetched_ids),并将获取的行的 ID 保存在该表中。然后你可以使用类似的东西:

SELECT tt.* from transactional_table tt 
RIGHT JOIN fetched_ids fi ON tt.id = fi.row_id 
WHERE fi.row_id IS NULL

这将从您的事务表中返回尚未保存为已获取的行。只要 (tt.id) 和 (fi.row_id) 都有(理想情况下唯一的)索引,即使在大量数据上也应该可以正常工作。MySQL 可以很好地处理索引字段上的 JOINS。不要害怕尝试 - 创建新表,将 id 复制到其中,删除其中一些并运行您的查询。你会看到结果,你会知道它们是否令人满意:)

PS 当然,向这个 'fetched_ids' 表添加行时应该小心运行,以免产生不必要的重复(30 个同时处理的进程可能会写入您需要的数据的 30 倍 - 如果您需要性能,您应该注意这种情况)。

于 2013-09-05T23:46:51.697 回答
0

I would try adding a timestamp column and use it as a reference when retrieving new rows.

于 2013-09-06T00:16:09.793 回答
0

具有如下结构的第二个表怎么样:

source_fk - 这将保存您要读取的数据行的 ID。process_id - 这将是 30 个进程之一的唯一 ID。

然后执行 LEFT JOIN 并从源中排除具有与指定 process_id 匹配的条目的项目。

获得结果后,只需返回并为获得的每个结果添加 source_fk 和 process_id。

一个优点是您可以稍后添加更多进程而不会出现问题。

于 2013-09-06T00:06:57.427 回答