0

我有一个数据库来维护要由各种处理机器处理的作业。因此,它的基本模式是:

+-------------+--------------+------+-----+---------+----------------+
| Field       | Type         | Null | Key | Default | Extra          |
+-------------+--------------+------+-----+---------+----------------+
| ID          | int(11)      | NO   | PRI | NULL    | auto_increment |
| EndTime     | datetime     | YES  |     | NULL    |                |
| GroupID     | varchar(255) | NO   | MUL | NULL    |                |
| HostAddress | varchar(15)  | YES  |     | NULL    |                |
| StartTime   | datetime     | YES  |     | NULL    |                |
+-------------+--------------+------+-----+---------+----------------+

ID 是自增的,HostAddress 代表处理这个 Job 的机器,StartTime 代表最近尝试处理它的开始时间,EndTime 是它成功完成处理的时间,GroupID 是一个任意字符串,用于引用其他表。

所有的加工机器都围绕着这张桌子同步进行抓取工作。新记录只能手动插入,尽管所有处理机器都可以更新现有记录。这个想法是让一台处理机器在它不工作时执行以下操作:

  • 查看是否有任何作业属于它(HostAddress = 它的 IP)并且尚未启动。
  • 如果没有,请查看是否有任何作业尚未申请(HostAddress IS NULL)。
  • 如果有无人认领的工作,请认领一些(将 HostAddress 更新为其 IP)。
  • 处理属于它的所有作业(与 #1 相同的检查,除了我们可能通过 #3 添加了一些)。

我原以为这一系列操作会导致数据库为我同步不同机器对同一工作的尝试;即使两台机器试图同时申请同一个工作,也只有其中一个 IP 最终会出现在 HostAddress 列中,因此当他们再次要求其 HostAddress 上的所有工作时,只有其中一个会得到该工作。

但情况似乎并非如此。昨晚几乎同时启动 35 台处理机器时,我观察到多台机器处理同一个作业的多个案例,尽管其中只有一个最终在数据库中声明了它。这对我来说意味着最后一次检查没有正常工作。这是我正在做的更具体的版本。数据库调用使用 em.createNamedQuery ,为简洁起见,我将在它们下方进行总结。JPA 由 Hibernate 3.6.8 提供,数据库是 MySQL 5.1.61。

protected void poll(EntityManager em) {
    List<JobRecord> candidates = null;
    //Synchronized only for this machine. Others are running concurrently.
    synchronized (em) {
        //Check if anything is already claimed by us.
        candidates = JobRecord.selectReady(em);
        //SELECT record FROM JobRecord record WHERE HostAddress=[IP]
        //    AND StartTime IS NULL AND EndTime IS NULL;
            if (candidates.isEmpty()) {
            //None claimed. Check if any jobs aren't claimed by anyone.
            candidates = JobRecord.selectAvailable(em);
            //SELECT record FROM JobRecord record WHERE HostAddress IS NULL
            //    AND StartTime IS NULL AND EndTime IS NULL;
            if (candidates.isEmpty()) {
                //All jobs have been processed.
                return;
            }
            //Claim these jobs we found for ourselves.
            em.getTransaction().begin();
            for (JobRecord job : candidates) {
                job.setStartTime(null);
                job.setEndTime(null);
                job.setHostAddress([IP]);
                em.merge(job);
            }
            em.getTransaction().commit;
            //Only process what is actually claimed by us; could be nothing.
            candidates = JobRecord.selectReady(em);
            //(The first query again.)
        }
    //Do processing with candidates list.
}

The only explanation that comes to mind is that when I do the em.getTransaction().commit the results are cached somehow, and that when I do the selectReady NamedQuery just after it, it's returning the cached result without bothering to consult the database. But that might not even be the case, and I'm not sure I could prove that. There might even be something fundamentally flawed with my scheme that I'm overlooking.

So, to actually pose my question, why did this database synchronization routine fail and what can I do to correct it?

4

1 回答 1

2

Multiple machines can invoke selectAvailable() before any of them perform the UPDATE transaction. Consequently, they may each think that the same jobs are available.

You need to begin the transaction before the selectAvailable() call, which should use SELECT ... FOR UPDATE in order to lock the available job records so that no other database connection can read from them until the transaction is committed.

于 2012-05-09T18:15:29.403 回答