6

我打算使用 Play 部署一个应用程序,并且以前从未使用过他们的“工作”。我的部署足够大,需要不同的 Play 服务器负载平衡,但我的计算量不足以需要 hadoop/storm/others。

我的问题是,如何在 Play 中处理这种情况?如果我在 Play 中设置每分钟运行的作业,我不希望每台服务器同时执行完全相同的操作。

我只能找到这个答案,但我不喜欢这些选项中的任何一个。

那么,是否有任何工具或最佳实践来协调工作,还是我必须从头开始做某事?

4

3 回答 3

0

您可以使用此处描述的数据库标志: Pere Villega 的Playframework 并发作业管理用于两个作业。

但我认为 Google Groups 上的 Guillaume Bort 使用 Memcache 的解决方案是最好的。Play 2 似乎有一个模块:https ://github.com/mumoshu/play2-memcached

于 2012-12-07T15:52:28.033 回答
0

您可以使用数据库中的表来存储 jobLock 但您必须在单独的事务中检查/更新此锁(为此您必须使用 JPA.newEntityManager)

我的 JobLock 类使用 LockMode 枚举

package enums;

public enum LockMode {
    FREE, ACQUIRED;
}

这是 JobLock 类

package models;

import java.util.Date;
import java.util.List;

import javax.persistence.Entity;
import javax.persistence.EntityManager;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Version;

import play.Logger;
import play.Play;
import play.data.validation.Required;
import play.db.jpa.JPA;
import play.db.jpa.Model;
import utils.Parser;
import enums.LockMode;
import exceptions.ServiceException;

/**
 * Technical class that allows to manage a lock in the database thus we can
 * synchronize multiple instances that thus cannot run the same job at the same
 * time
 * 
 * @author sebastien
 */
@Entity
public class JobLock extends Model {

    private static final Long MAX_ACQUISITION_DELAY = Parser.parseLong(Play.configuration.getProperty(
            "job.lock.acquisitiondelay", "10000"));

    @Required
    public String jobName;

    public Date acquisitionDate;

    @Required
    @Enumerated(EnumType.STRING)
    public LockMode lockMode;

    @Version
    public int version;

    // STATIC METHODS
    // ~~~~~~~~~~~~~~~

    /**
     * Acquire the lock for the type of job identified by the name parameter.
     * Acquisition of the lock is done on a separate transaction thus is
     * transaction is as small as possible and other instances will see the lock
     * acquisition sooner.
     * <p>
     * If we do not do that, the other instances will be blocked until the
     * instance that acquired the lock have finished is businees transaction
     * which could be long on a job.
     * </p>
     * 
     * @param name
     *            the name that identifies a job category, usually it is the job
     *            simple class name
     * @return the lock object if the acquisition is successfull, null otherwise
     */
    public static JobLock acquireLock(String name) {
        EntityManager em = JPA.newEntityManager();
        try {
            em.getTransaction().begin();
            List<JobLock> locks = em.createQuery("from JobLock where jobName=:name", JobLock.class)
                    .setParameter("name", name).setMaxResults(1).getResultList();
            JobLock lock = locks != null && !locks.isEmpty() ? locks.get(0) : null;
            if (lock == null) {
                lock = new JobLock();
                lock.jobName = name;
                lock.acquisitionDate = new Date();
                lock.lockMode = LockMode.ACQUIRED;
                em.persist(lock);
            } else {
                if (LockMode.ACQUIRED.equals(lock.lockMode)) {
                    if ((System.currentTimeMillis() - lock.acquisitionDate.getTime()) > MAX_ACQUISITION_DELAY) {
                        throw new ServiceException(String.format(
                                "Lock is held for too much time : there is a problem with job %s", name));
                    }
                    return null;
                }
                lock.lockMode = LockMode.ACQUIRED;
                lock.acquisitionDate = new Date();
                lock.willBeSaved = true;
            }
            em.flush();
            em.getTransaction().commit();
            return lock;
        } catch (Exception e) {
            // Do not log exception here because it is normal to have exception
            // in case of multi-node installation, this is the way to avoid
            // multiple job execution
            if (em.getTransaction().isActive()) {
                em.getTransaction().rollback();
            }
            // Maybe we have to inverse the test and to define which exception
            // is not problematic : exception that denotes concurrency in the
            // database are normal
            if (e instanceof ServiceException) {
                throw (ServiceException) e;
            } else {
                return null;
            }
        } finally {
            if (em.isOpen()) {
                em.close();
            }
        }
    }

    /**
     * Release the lock on the database thus another instance can take it. This
     * action change the {@link #lockMode} and set {@link #acquisitionDate} to
     * null. This is done in a separate transaction that can have visibility on
     * what happens on the database during the time of the business transaction
     * 
     * @param lock
     *            the lock to release
     * @return true if we managed to relase the lock and false otherwise
     */
    public static boolean releaseLock(JobLock lock) {
        EntityManager em = JPA.newEntityManager();

        if (lock == null || LockMode.FREE.equals(lock.lockMode)) {
            return false;
        }

        try {
            em.getTransaction().begin();
            lock = em.find(JobLock.class, lock.id);
            lock.lockMode = LockMode.FREE;
            lock.acquisitionDate = null;
            lock.willBeSaved = true;
            em.persist(lock);
            em.flush();
            em.getTransaction().commit();
            return true;
        } catch (Exception e) {
            if (em.getTransaction().isActive()) {
                em.getTransaction().rollback();
            }
            Logger.error(e, "Error during commit of lock release");
            return false;
        } finally {
            if (em.isOpen()) {
                em.close();
            }
        }
    }
}

这是我使用这个锁的 LockAwareJob

package jobs;

import models.JobLock;
import notifiers.ExceptionMails;
import play.Logger;
import play.jobs.Job;

public abstract class LockAwareJob<V> extends Job<V> {

    @Override
    public final void doJob() throws Exception {
        String name = this.getClass().getSimpleName();
        try {
            JobLock lock = JobLock.acquireLock(name);
            if (lock != null) {
                Logger.info("Starting %s", name);
                try {
                    doJobWithLock(lock);
                } finally {
                    if (!JobLock.releaseLock(lock)) {
                        Logger.error("Lock acquired but cannot be released for %s", name);
                    }
                    Logger.info("End of %s", name);
                }
            } else {
                Logger.info("Another node is running %s : nothing to do", name);
            }
        } catch (Exception ex) {
            ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
            throw ex;
        }
    }

    @Override
    public final V doJobWithResult() throws Exception {
        String name = this.getClass().getSimpleName();
        try {
            JobLock lock = JobLock.acquireLock(name);
            if (lock != null) {
                Logger.info("Starting %s", name);
                try {
                    return resultWithLock(lock);
                } finally {
                    if (!JobLock.releaseLock(lock)) {
                        Logger.error("Lock acquired but cannot be released for %s", name);
                    }
                    Logger.info("End of %s", name);
                }
            } else {
                Logger.info("Another node is running %s : nothing to do", name);
                return resultWithoutLock();
            }
        } catch (Exception ex) {
            ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
            throw ex;
        }
    }

    public void doJobWithLock(JobLock lock) throws Exception {
    }

    public V resultWithLock(JobLock lock) throws Exception {
        doJobWithLock(lock);
        return null;
    }

    public V resultWithoutLock() throws Exception {
        return null;
    }
}

在我的 log4j.properties 中,我添加了一个特殊行,以避免每次实例未能获取作业锁时出错

log4j.logger.org.hibernate.event.def.AbstractFlushingEventListener=FATAL

使用此解决方案,您还可以使用 JobLock id 来存储与此作业相关的参数(例如上次运行日期)

于 2012-12-10T06:52:58.277 回答
0

为了简单起见,我个人会使用一个实例运行作业。或者,如果您想要更好地控制执行和更好的并发、并行处理,您可以考虑使用 Akka 而不是 Jobs。

于 2012-12-08T07:39:23.260 回答