0

我有以下设计:

有一个扩展的任务,TimerTask它计划每分钟运行一次。此任务将尝试从中央队列(作为单个消费者)获取项目并将其表示写入文件。

此外,还有多个生产者不时将项目放入中央队列。

我感兴趣的是,每次执行任务时(run() method executed),如果有项目,它将从队列中提取所有项目,如果没有项目则什么也不做。

如果队列已满,生产者应该在队列上休眠。

我对这个问题的解决方案是:

创建扩展 TimerTask 的 ExtractTask。ExtractTask 将包含一个 BlockingQueue。每个生产者都将通过执行 getQueue() 方法接收对队列实例的引用。生产者将执行 BlockingQueue.put() 方法。消费者将在 run() 中执行 BlockingQueue.poll() 方法。

你能推荐一个更好的设计吗?我的设计是否包含任何有问题的场景案例?此设计可能遇到的任何同步问题?

4

5 回答 5

2

我会:

  • 将队列与设计中的任务分开,
  • 注入队列而不是进行查找,
  • 使用 SchedulerService 而不是 TimerTask

除此之外,你已经得到它。

如果您愿意冒险依赖 Spring,您应该研究Spring Integration。您描述的所有组件都在那里。你也可以使用许多其他框架来解决这个问题,比如 Camel 或 Akka;我的主要观点是,如果您不是绝对必须自己维护此代码,则不要自己维护。

免责声明:我对 Spring 集成有些偏见

于 2012-10-22T15:09:27.643 回答
1

设计看起来不错。这里没有太多细节,所以很难确定。我建议将所有依赖项注入计时器任务。

此外,您可能无需太多自定义代码就可以在 Apache Camel 中实现这一目标。见https://camel.apache.org/timer.html

于 2012-10-22T15:07:32.613 回答
1

既然你问过设计,我建议几件事:

  • 我个人会通过 Timer Task 获得 Executor 服务。看看这里。使用 executor 确保您可以在未来需求发生变化时在单独的线程中执行任务。
  • 尝试将您的队列与任务对象分开。
  • 通常在代码中使用 DI 以便其可测试。
  • 我会让生产者在他们的构造函数中接收队列。
于 2012-10-22T15:20:43.423 回答
1

根据您的设计,我可以在下面想到类似的内容。ConsumerTask 可以使用泛型,但我很难弄清楚如何对 Producer 线程做同样的事情。生产者和消费者都对生产/消费的物品数量有限制。从 TimerTask 逻辑中,取消 TimerTask 本身的 run() 方法中的计时器以使其停止是必不可少的。在这种情况下,只能使用 POISON PILL 方法来关闭。如果您使用 Executors.newSingleThreadExecutor() 或 scheduleThreadPoolExecutor(),则可以使用 shutdown() 和 shutdownNow() 方法来停止生产者或消费者。虽然 TimerTask 是检查 ConcurrentQueue 工作的一个很好的例子,但它不会在生产系统中使用。

编辑 向生产者线程添加通用功能。构造函数现在采用一个模板类,该类实现将项目添加到队列的方法。我已经定义了一个抽象类 AddItem,其中包含和 addItem() 方法,只要生产者想要将项目添加到队列中,就会调用该方法。

import java.util.Date;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConsumerTask<T> extends TimerTask {
    Timer timer;
    ConcurrentLinkedQueue<T> itemQueue;
    AtomicLong count = new AtomicLong(0);
    final long limit;

    public ConsumerTask(ConcurrentLinkedQueue<T> itemQ, long lim, int seconds) {
        limit = lim;
        timer = new Timer();
        timer.scheduleAtFixedRate(this, new Date(), seconds * 1000);
        itemQueue = itemQ;
    }

    public void run() {
        T item = itemQueue.peek();
        if (item != null) {
            if (count.incrementAndGet() <= limit) {
                System.out.println("Extracting Item : " + itemQueue.poll());
            } else {
                System.out
                        .println("Consumed : " + (count.get() - 1) + " items");
                timer.cancel();
            }

        }
    }

    public static void main(String args[]) throws InterruptedException {
        ConcurrentLinkedQueue<Integer> itemQ = new ConcurrentLinkedQueue<Integer>();
        ConsumerTask<Integer> ct = new ConsumerTask<Integer>(itemQ, 10, 1);

        new Thread(new Producer<Integer>(itemQ, new IntegerAddItem(itemQ), 20))
                .start();
        new Thread(ct).start();

    }
}

abstract class AddItem<T> {
    ConcurrentLinkedQueue<T> itemQ;
    T t;

    public AddItem(ConcurrentLinkedQueue<T> itemQ) {
        this.itemQ = itemQ;
    }

    abstract boolean addItem();

    public boolean addItem(T t) {
        return itemQ.add(t);
    }
}

class IntegerAddItem extends AddItem<Integer> {
    public IntegerAddItem(ConcurrentLinkedQueue<Integer> itemQ) {
        super(itemQ);
    }

    AtomicInteger item = new AtomicInteger(0);

    @Override
    boolean addItem() {
        return addItem(item.incrementAndGet());
    }

}

class Producer<T> implements Runnable {
    private final ConcurrentLinkedQueue<T> itemQueue;
    AtomicInteger item = new AtomicInteger(0);
    AtomicLong count = new AtomicLong(0);
    AddItem<T> addMethod;
    final long limit;

    public Producer(ConcurrentLinkedQueue<T> itemQ, AddItem<T> addMethod,
            long limit) {
        itemQueue = itemQ;
        this.limit = limit;
        this.addMethod = addMethod;
    }

    public void run() {
        while (count.getAndIncrement() < limit) {
            addMethod.addItem();
            try {
                Thread.sleep(new Random().nextInt(5000));
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                Thread.currentThread().interrupt();
            }

        }
    }
}
于 2012-10-22T16:18:38.370 回答
0

您说消费者将在执行计时器时提取所有项目。

您应该注意从队列中提取所有项目的操作不是阻塞操作,它是poll()阻塞方法调用的重复,这意味着在提取项目时生产者将能够将项目添加到队列中。

于 2012-10-22T15:17:36.550 回答