我将在下面描述如何在 Alfresco 中实现批处理。在详细介绍之前,我还想建议将此流程与 Activiti 工作流(或 JBPM,如果您愿意)集成。
如后面所述,该进程将发送事件以通知侦听器有关作业的进度。这些事件的监听者可以直接调用 Jenkins。
侦听器可以更新工作流,而不是直接调用 Jenkins。在这种情况下,调用 Jenkins 的逻辑将在工作流任务中实现。这使得将批处理的逻辑与通知程序的逻辑分开变得更加容易。此外,工作流还可用于存储有关工作进度的信息。这些信息最终可以被感兴趣的人/事物轮询。
长时间运行的过程:
我不知道您使用的是什么版本的 Alfresco,我将描述 4.1 版本的解决方案。Alfresco 支持长时间运行的批处理,主要使用包 org.alfresco.repo.batch 中的类和接口:
批处理工作提供者
批处理器
BatchProcessor.BatchProcessWorker
批处理监视器
BatchMonitorEvent.java
您需要提供两个接口的实现:BatchProcessorWorkProvider 和 BatchProcessor.BatchProcessWorker:
两个接口都附在下面。第一个返回工作负载,第二个定义工人是什么。
批处理器:
/**
* An interface that provides work loads to the {@link BatchProcessor}.
*
* @author Derek Hulley
* @since 3.4
*/
public interface BatchProcessWorkProvider<T>
{
/**
* Get an estimate of the total number of objects that will be provided by this instance.
* Instances can provide accurate answers on each call, but only if the answer can be
* provided quickly and efficiently; usually it is enough to to cache the result after
* providing an initial estimate.
*
* @return a total work size estimate
*/
int getTotalEstimatedWorkSize();
/**
* Get the next lot of work for the batch processor. Implementations should return
* the largest number of entries possible; the {@link BatchProcessor} will keep calling
* this method until it has enough work for the individual worker threads to process
* or until the work load is empty.
*
* @return the next set of work object to process or an empty collection
* if there is no more work remaining.
*/
Collection<T> getNextWork();
}
批处理工人:
/**
* An interface for workers to be invoked by the {@link BatchProcessor}.
*/
public interface BatchProcessWorker<T>
{
/**
* Gets an identifier for the given entry (for monitoring / logging purposes).
*
* @param entry
* the entry
* @return the identifier
*/
public String getIdentifier(T entry);
/**
* Callback to allow thread initialization before the work entries are
* {@link #process(Object) processed}. Typically, this will include authenticating
* as a valid user and disbling or enabling any system flags that might affect the
* entry processing.
*/
public void beforeProcess() throws Throwable;
/**
* Processes the given entry.
*
* @param entry
* the entry
* @throws Throwable
* on any error
*/
public void process(T entry) throws Throwable;
/**
* Callback to allow thread cleanup after the work entries have been
* {@link #process(Object) processed}.
* Typically, this will involve cleanup of authentication and resetting any
* system flags previously set.
* <p/>
* This call is made regardless of the outcome of the entry processing.
*/
public void afterProcess() throws Throwable;
}
在实践中,BatchProcessWorkProvider 返回一个“待办工作”(“T”类)的集合。“要做的工作”是您需要提供的课程。在您的情况下,此类可以提供从远程系统中提取文件子集的信息。方法进程将使用此信息来实际完成工作。举个例子,在你的情况下,我们可以调用 T,ImportFiles。
您的 BatchProcessWorkProvider 应将文件列表划分为大小合理的 ImportFiles 集合。
BatchProcessWorker 中“最重要”的方法是
public void process(ImportFiles filesToImport) throws Throwable;
这是您必须实施的方法。对于其他方法,有一个适配器 BatchProcess.BatchProcessWorkerAdapter 提供默认实现。
process 方法接收一个 ImportFiles 作为参数,并可以使用它来查找远程服务器中的文件并导入它们。
最后,您需要实例化一个 BatchProcessor:
try {
final RetryingTransactionHelper retryingTransactionHelper = transactionService.getRetryingTransactionHelper();
BatchProcessor<ImportFiles> batchProcessor = new BatchProcessor<ImportFiles>(processName,
retryingTransactionHelper, workProvider, threads, batchSize,
applicationEventPublisher, logger, loggingInterval);
batchProcessor.process(worker, true);
}
catch (LockAcquisitionException e) {
/* Manage exception */
}
在哪里
processName:对长时间运行的进程的描述
workProvider BatchProcessWorkProvider 的一个实例
线程:工作线程的数量(并行)
batchSize:同一事务中要处理的条目数
logger:用于报告进度的记录器
loggingInterval:在报告进度之前要处理的条目数
retryingTransactionHelper:是在并发更新(乐观锁定)或死锁条件失败时重试事务的帮助器类。
applicationEventPublisher:这是 Spring ApplicationEventPublisher 的一个实例,通常(也适用于 Alfresco)是 Spring ApplicationContext。
要将事件发送到 Jenkins,您可以使用 applicationEventPublisher。以下链接描述了如何使用它。它是 Spring 的标准功能。
春季活动
例如,事件可以由方法发送
process(ImportFiles filesToImport)
如上所述。