我有一个 ArrayBlocking 队列,单线程固定速率 Scheduled 在该队列上工作。我可能有失败的任务。我想以高优先级或最高级别重新运行或重新插入队列


2 回答 2


这里有一些想法 -
你为什么使用 ArrayBlockingQueue 而不是PriorityBlockingQueue?听起来像你需要我。首先将所有元素设置为同等优先级。
如果您收到异常 - 重新插入具有更高优先级的队列

于 2012-06-22T05:20:18.147 回答

最简单的事情可能是优先级队列。将重试编号附加到任务。它从零开始。运行不成功后,丢弃所有的 1 并增加 0 并将它们以高优先级放回队列中。使用此方法,您可以轻松地决定将所有内容运行 3 次,或者更多,如果您想稍后再运行。不利的一面是您必须修改任务类。


要解决此问题,您必须自己进行阻止。两个队列都应该是非阻塞的。(建议: java.util.concurrent.ConcurrentLinkedQueue。)在没有结果的情况下轮询两个队列后,wait()在监视器上。当任何东西放入队列时,它应该调用notifyAll()并且调度程序可以再次启动。需要非常小心,以免在调度程序检查两个队列之后但在它调用之前发生通知wait()



private final ConcurrentLinkedQueue  mainQueue = new ConcurrentLinkedQueue();
private final ConcurrentLinkedQueue  prioQueue = new ConcurrentLinkedQueue();
private final Object  entryWatch = new Object();

/** Adds a new job to the queue. */
public void addjob( Runnable runjob )  {
    synchronized (entryWatch)  { entryWatch.notifyAll(); }
/** The endless loop that does the work. */
public void schedule()  {
    for (;;)  {
        Runnable  run = getOne();  // Avoids lock if successful.
        if (run == null)  {
            // Both queues are empty.
            synchronized (entryWatch)  {
                // Need to check again.  Someone might have added and notifiedAll
                // since last check. From this point until, wait, we can be sure
                // entryWatch is not notified.
                run = getOne();
                if (run == null)  {
                    // Both queues are REALLY empty.
                    try  { entryWatch.wait(); }
                    catch (InterruptedException ie)  {}
        runit( run );
/** Helper method for the endless loop. */
private Runnable getOne()  {
    Runnable  run = (Runnable) prioQueue.poll();
    if (run != null)  return run;
    return (Runnable) mainQueue.poll();
/** Runs a new job. */
public void runit( final Runnable runjob )  {
    // Do everthing in another thread.  (Optional)
    new Thread()  {
        @Override public void run()  {
            // Run run.  (Possibly in own thread?)
            // (Perhaps best in thread from a thread pool.)
            // Handle failure (runit only, NOT in runitLast).
            //  Defining "failure" left as exercise for reader.
            if (failure)  {
                // Put code here to handle failure.
                // Put back in queue.
                prioQueue.add( runjob );
                synchronized (entryWatch)  { entryWatch.notifyAll(); }
/** Reruns a job. */
public void runitLast( final Runnable runjob )  {
     // Same code as "runit", but don't put "runjob" in "prioQueue" on failure.
于 2012-06-22T16:00:08.007 回答