16

这是我的用例。

旧系统更新数据库队列表 QUEUE。

我想要一个计划的重复作业 - 检查 QUEUE 的内容 - 如果表中有行,它会锁定该行并做一些工作 - 删除 QUEUE 中的行

如果前一个作业仍在运行,则将创建一个新线程来完成该工作。我想配置最大并发线程数。

我正在使用 Spring 3,我当前的解决方案是执行以下操作(使用 1 毫秒的固定速率来使线程基本上连续运行)

@Scheduled(fixedRate = 1)
@Async
public void doSchedule() throws InterruptedException {
    log.debug("Start schedule");
    publishWorker.start();
    log.debug("End schedule");
}

<task:executor id="workerExecutor" pool-size="4" />

这直接创建了 4 个线程,并且线程正确地共享了队列中的工作负载。但是,当线程需要很长时间才能完成时,我似乎遇到了内存泄漏。

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0                              |              80 |   373,410,496 |     89.74%
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940                          |           48 |   373,410,136 |     89.74%
|  |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68  

所以

1:我应该同时使用@Async 和@Scheduled 吗?

2:如果不是那么我还能如何使用spring来满足我的要求?

3:如何只在其他线程忙时才创建新线程?

谢谢大家!

编辑:我认为工作队列变得无限长......现在使用

    <task:executor id="workerExecutor"
    pool-size="1-4"
    queue-capacity="10" rejection-policy="DISCARD" />

将报告结果

4

2 回答 2

4

你可以试试

  1. 运行一个延迟一秒的调度程序,这将锁定并获取到目前为止尚未锁定的所有 QUEUE 记录。
  2. 对于每条记录,调用一个异步方法,该方法将处理该记录并删除它。
  3. 执行器的拒绝策略应该是 ABORT,以便调度程序可以解锁尚未发出处理的 QUEUE。这样调度程序可以在下次运行时再次尝试处理这些 QUEUE。

当然,您必须处理调度程序已锁定 QUEUE 的情况,但无论出于何种原因,处理程序都没有完成对它的处理。

伪代码:

public class QueueScheduler {
    @AutoWired
    private QueueHandler queueHandler;

    @Scheduled(fixedDelay = 1000)
    public void doSchedule() throws InterruptedException {
        log.debug("Start schedule");
        List<Long> queueIds = lockAndFetchAllUnlockedQueues();
        for (long id : queueIds)
            queueHandler.process(id);
        log.debug("End schedule");
    }
}

public class QueueHandler {

    @Async
    public void process(long queueId) {
        // process the QUEUE & delete it from DB
    }
}
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10"
     rejection-policy="ABORT"/>
于 2014-08-27T08:27:56.370 回答
1
//using a fixedRate of 1 millisecond to get the threads to run basically continuously
@Scheduled(fixedRate = 1)

当您使用@Scheduled新线程时,将创建一个新线程,并将doSchedule 在 1 毫秒时以指定的 fixedRate 调用方法。当您运行您的应用程序时,您已经可以看到 4 个线程竞争 QUEUE 表,并且可能出现死锁。

通过线程转储调查是否存在死锁。 http://helpx.adobe.com/cq/kb/TakeThreadDump.html

@Async 注解在这里没有任何用处。

实现这一点的更好方法是通过实现可运行并将您的类传递给TaskExecutor所需数量的线程来将您的类创建为线程。

使用 Spring 线程和 TaskExecutor,我如何知道线程何时完成?

还要检查您的设计,它似乎没有正确处理同步。如果前一个作业正在运行并持有该行的锁,那么您创建的下一个作业仍会看到该行并等待获取该特定行的锁。

于 2013-10-16T05:19:22.187 回答