2

我们正在 Alfresco(基于 Java 的)中创建一个 Web 脚本。在一定的时间间隔,这个 webscript 应该从远程系统下载文件(大量文件)并处理它们并在 Alfresco 中对文件进行版本控制。

现在,这个 Web 脚本将从 Jenkins 盒子中触发,所以我们计划轮询这个 Web 脚本的状态是否完成,直到整个过程完成。这将每天或每周定期发生。

如何使 webscript 定期向 Jenkins 作业发送中间响应并继续处理。一旦所有进程完成,同一个 webscript 调用应该将完成状态发送到 jenkin 框。

我怎样才能做到这一点?

注意:我不能使用 Cron。只能使用 Jenkins 作为 webscript 的输入,将从 Jenkins 发送(从不同的产品接收)。

4

2 回答 2

3

我将在下面描述如何在 Alfresco 中实现批处理。在详细介绍之前,我还想建议将此流程与 Activiti 工作流(或 JBPM,如果您愿意)集成。

如后面所述,该进程将发送事件以通知侦听器有关作业的进度。这些事件的监听者可以直接调用 Jenkins。

侦听器可以更新工作流,而不是直接调用 Jenkins。在这种情况下,调用 Jenkins 的逻辑将在工作流任务中实现。这使得将批处理的逻辑与通知程序的逻辑分开变得更加容易。此外,工作流还可用于存储有关工作进度的信息。这些信息最终可以被感兴趣的人/事物轮询。

长时间运行的过程:

我不知道您使用的是什么版本的 Alfresco,我将描述 4.1 版本的解决方案。Alfresco 支持长时间运行的批处理,主要使用包 org.alfresco.repo.batch 中的类和接口:

批处理工作提供者

批处理器

BatchProcessor.BatchProcessWorker

批处理监视器

BatchMonitorEvent.java

您需要提供两个接口的实现:BatchProcessorWorkProvider 和 BatchProcessor.BatchProcessWorker:

两个接口都附在下面。第一个返回工作负载,第二个定义工人是什么。

批处理器

/**
 * An interface that provides work loads to the {@link BatchProcessor}.
 * 
 * @author Derek Hulley
 * @since 3.4
 */
public interface BatchProcessWorkProvider<T>
{
    /**
     * Get an estimate of the total number of objects that will be provided by this instance.
     * Instances can provide accurate answers on each call, but only if the answer can be
     * provided quickly and efficiently; usually it is enough to to cache the result after
     * providing an initial estimate.
     * 
     * @return                  a total work size estimate
     */
    int getTotalEstimatedWorkSize();
    
    /**
     * Get the next lot of work for the batch processor.  Implementations should return
     * the largest number of entries possible; the {@link BatchProcessor} will keep calling
     * this method until it has enough work for the individual worker threads to process
     * or until the work load is empty.
     * 
     * @return                  the next set of work object to process or an empty collection
     *                          if there is no more work remaining.
     */
    Collection<T> getNextWork();
}

批处理工人

/**
 * An interface for workers to be invoked by the {@link BatchProcessor}.
 */
public interface BatchProcessWorker<T>
{
    /**
     * Gets an identifier for the given entry (for monitoring / logging purposes).
     * 
     * @param entry
     *            the entry
     * @return the identifier
     */
    public String getIdentifier(T entry);

    /**
     * Callback to allow thread initialization before the work entries are
     * {@link #process(Object) processed}.  Typically, this will include authenticating
     * as a valid user and disbling or enabling any system flags that might affect the
     * entry processing.
     */
    public void beforeProcess() throws Throwable;

    /**
     * Processes the given entry.
     * 
     * @param entry
     *            the entry
     * @throws Throwable
     *             on any error
     */
    public void process(T entry) throws Throwable;

    /**
     * Callback to allow thread cleanup after the work entries have been
     * {@link #process(Object) processed}.
     * Typically, this will involve cleanup of authentication and resetting any
     * system flags previously set.
     * <p/>
     * This call is made regardless of the outcome of the entry processing.
     */
    public void afterProcess() throws Throwable;
}

在实践中,BatchProcessWorkProvider 返回一个“待办工作”(“T”类)的集合。“要做的工作”是您需要提供的课程。在您的情况下,此类可以提供从远程系统中提取文件子集的信息。方法进程将使用此信息来实际完成工作。举个例子,在你的情况下,我们可以调用 T,ImportFiles。

您的 BatchProcessWorkProvider 应将文件列表划分为大小合理的 ImportFiles 集合。

BatchProcessWorker 中“最重要”的方法是

public void process(ImportFiles filesToImport) throws Throwable;

这是您必须实施的方法。对于其他方法,有一个适配器 BatchProcess.BatchProcessWorkerAdapter 提供默认实现。

process 方法接收一个 ImportFiles 作为参数,并可以使用它来查找远程服务器中的文件并导入它们。

最后,您需要实例化一个 BatchProcessor:

try {
    final RetryingTransactionHelper retryingTransactionHelper = transactionService.getRetryingTransactionHelper();
    BatchProcessor<ImportFiles> batchProcessor = new BatchProcessor<ImportFiles>(processName,
            retryingTransactionHelper, workProvider, threads, batchSize,
            applicationEventPublisher, logger, loggingInterval);
    batchProcessor.process(worker, true);
} 
catch (LockAcquisitionException e) {
    /* Manage exception */
} 

在哪里

processName:对长时间运行的进程的描述

workProvider BatchProcessWorkProvider 的一个实例

线程:工作线程的数量(并行)

batchSize:同一事务中要处理的条目数

logger:用于报告进度的记录器

loggingInterval:在报告进度之前要处理的条目数

retryingTransactionHelper:是在并发更新(乐观锁定)或死锁条件失败时重试事务的帮助器类。

applicationEventPublisher:这是 Spring ApplicationEventPublisher 的一个实例,通常(也适用于 Alfresco)是 Spring ApplicationContext。

要将事件发送到 Jenkins,您可以使用 applicationEventPublisher。以下链接描述了如何使用它。它是 Spring 的标准功能。

春季活动

例如,事件可以由方法发送

process(ImportFiles filesToImport)

如上所述。

于 2016-01-18T01:09:25.170 回答
1

我不会争论你选择一个 webscript 来实现你的逻辑,尽管我不是 100% 同意它。

至于您的问题,您可以将作业/逻辑执行的整体进度状态存储在某个单例中,并让另一个 wesbcript(或只是具有不同参数的同一个)为您返回该值。

于 2016-01-14T23:48:11.250 回答