我打算使用 Play 部署一个应用程序,并且以前从未使用过他们的“工作”。我的部署足够大,需要不同的 Play 服务器负载平衡,但我的计算量不足以需要 hadoop/storm/others。
我的问题是,如何在 Play 中处理这种情况?如果我在 Play 中设置每分钟运行的作业,我不希望每台服务器同时执行完全相同的操作。
我只能找到这个答案,但我不喜欢这些选项中的任何一个。
那么,是否有任何工具或最佳实践来协调工作,还是我必须从头开始做某事?
我打算使用 Play 部署一个应用程序,并且以前从未使用过他们的“工作”。我的部署足够大,需要不同的 Play 服务器负载平衡,但我的计算量不足以需要 hadoop/storm/others。
我的问题是,如何在 Play 中处理这种情况?如果我在 Play 中设置每分钟运行的作业,我不希望每台服务器同时执行完全相同的操作。
我只能找到这个答案,但我不喜欢这些选项中的任何一个。
那么,是否有任何工具或最佳实践来协调工作,还是我必须从头开始做某事?
您可以使用此处描述的数据库标志: Pere Villega 的Playframework 并发作业管理用于两个作业。
但我认为 Google Groups 上的 Guillaume Bort 使用 Memcache 的解决方案是最好的。Play 2 似乎有一个模块:https ://github.com/mumoshu/play2-memcached
您可以使用数据库中的表来存储 jobLock 但您必须在单独的事务中检查/更新此锁(为此您必须使用 JPA.newEntityManager)
我的 JobLock 类使用 LockMode 枚举
package enums;
public enum LockMode {
FREE, ACQUIRED;
}
这是 JobLock 类
package models;
import java.util.Date;
import java.util.List;
import javax.persistence.Entity;
import javax.persistence.EntityManager;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Version;
import play.Logger;
import play.Play;
import play.data.validation.Required;
import play.db.jpa.JPA;
import play.db.jpa.Model;
import utils.Parser;
import enums.LockMode;
import exceptions.ServiceException;
/**
* Technical class that allows to manage a lock in the database thus we can
* synchronize multiple instances that thus cannot run the same job at the same
* time
*
* @author sebastien
*/
@Entity
public class JobLock extends Model {
private static final Long MAX_ACQUISITION_DELAY = Parser.parseLong(Play.configuration.getProperty(
"job.lock.acquisitiondelay", "10000"));
@Required
public String jobName;
public Date acquisitionDate;
@Required
@Enumerated(EnumType.STRING)
public LockMode lockMode;
@Version
public int version;
// STATIC METHODS
// ~~~~~~~~~~~~~~~
/**
* Acquire the lock for the type of job identified by the name parameter.
* Acquisition of the lock is done on a separate transaction thus is
* transaction is as small as possible and other instances will see the lock
* acquisition sooner.
* <p>
* If we do not do that, the other instances will be blocked until the
* instance that acquired the lock have finished is businees transaction
* which could be long on a job.
* </p>
*
* @param name
* the name that identifies a job category, usually it is the job
* simple class name
* @return the lock object if the acquisition is successfull, null otherwise
*/
public static JobLock acquireLock(String name) {
EntityManager em = JPA.newEntityManager();
try {
em.getTransaction().begin();
List<JobLock> locks = em.createQuery("from JobLock where jobName=:name", JobLock.class)
.setParameter("name", name).setMaxResults(1).getResultList();
JobLock lock = locks != null && !locks.isEmpty() ? locks.get(0) : null;
if (lock == null) {
lock = new JobLock();
lock.jobName = name;
lock.acquisitionDate = new Date();
lock.lockMode = LockMode.ACQUIRED;
em.persist(lock);
} else {
if (LockMode.ACQUIRED.equals(lock.lockMode)) {
if ((System.currentTimeMillis() - lock.acquisitionDate.getTime()) > MAX_ACQUISITION_DELAY) {
throw new ServiceException(String.format(
"Lock is held for too much time : there is a problem with job %s", name));
}
return null;
}
lock.lockMode = LockMode.ACQUIRED;
lock.acquisitionDate = new Date();
lock.willBeSaved = true;
}
em.flush();
em.getTransaction().commit();
return lock;
} catch (Exception e) {
// Do not log exception here because it is normal to have exception
// in case of multi-node installation, this is the way to avoid
// multiple job execution
if (em.getTransaction().isActive()) {
em.getTransaction().rollback();
}
// Maybe we have to inverse the test and to define which exception
// is not problematic : exception that denotes concurrency in the
// database are normal
if (e instanceof ServiceException) {
throw (ServiceException) e;
} else {
return null;
}
} finally {
if (em.isOpen()) {
em.close();
}
}
}
/**
* Release the lock on the database thus another instance can take it. This
* action change the {@link #lockMode} and set {@link #acquisitionDate} to
* null. This is done in a separate transaction that can have visibility on
* what happens on the database during the time of the business transaction
*
* @param lock
* the lock to release
* @return true if we managed to relase the lock and false otherwise
*/
public static boolean releaseLock(JobLock lock) {
EntityManager em = JPA.newEntityManager();
if (lock == null || LockMode.FREE.equals(lock.lockMode)) {
return false;
}
try {
em.getTransaction().begin();
lock = em.find(JobLock.class, lock.id);
lock.lockMode = LockMode.FREE;
lock.acquisitionDate = null;
lock.willBeSaved = true;
em.persist(lock);
em.flush();
em.getTransaction().commit();
return true;
} catch (Exception e) {
if (em.getTransaction().isActive()) {
em.getTransaction().rollback();
}
Logger.error(e, "Error during commit of lock release");
return false;
} finally {
if (em.isOpen()) {
em.close();
}
}
}
}
这是我使用这个锁的 LockAwareJob
package jobs;
import models.JobLock;
import notifiers.ExceptionMails;
import play.Logger;
import play.jobs.Job;
public abstract class LockAwareJob<V> extends Job<V> {
@Override
public final void doJob() throws Exception {
String name = this.getClass().getSimpleName();
try {
JobLock lock = JobLock.acquireLock(name);
if (lock != null) {
Logger.info("Starting %s", name);
try {
doJobWithLock(lock);
} finally {
if (!JobLock.releaseLock(lock)) {
Logger.error("Lock acquired but cannot be released for %s", name);
}
Logger.info("End of %s", name);
}
} else {
Logger.info("Another node is running %s : nothing to do", name);
}
} catch (Exception ex) {
ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
throw ex;
}
}
@Override
public final V doJobWithResult() throws Exception {
String name = this.getClass().getSimpleName();
try {
JobLock lock = JobLock.acquireLock(name);
if (lock != null) {
Logger.info("Starting %s", name);
try {
return resultWithLock(lock);
} finally {
if (!JobLock.releaseLock(lock)) {
Logger.error("Lock acquired but cannot be released for %s", name);
}
Logger.info("End of %s", name);
}
} else {
Logger.info("Another node is running %s : nothing to do", name);
return resultWithoutLock();
}
} catch (Exception ex) {
ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
throw ex;
}
}
public void doJobWithLock(JobLock lock) throws Exception {
}
public V resultWithLock(JobLock lock) throws Exception {
doJobWithLock(lock);
return null;
}
public V resultWithoutLock() throws Exception {
return null;
}
}
在我的 log4j.properties 中,我添加了一个特殊行,以避免每次实例未能获取作业锁时出错
log4j.logger.org.hibernate.event.def.AbstractFlushingEventListener=FATAL
使用此解决方案,您还可以使用 JobLock id 来存储与此作业相关的参数(例如上次运行日期)
为了简单起见,我个人会使用一个实例运行作业。或者,如果您想要更好地控制执行和更好的并发、并行处理,您可以考虑使用 Akka 而不是 Jobs。