0

在这种情况下(任何版本),我试图找出 Java 中的最佳方法来实现单个生产者多个消费者,其中我使用 ExecutorService(根据偏好,不需要)生产者需要“永远”运行但每次它运行时,它需要等待所有内容都被完全处理完,例如所有消费者线程都已终止,队列为空,并且没有剩余的项目要生产。生产者也应该只以固定的时间间隔轮询其数据源。

举个例子:我希望我的生产者每 30 分钟轮询一次它的数据源以获取记录并将它们提交到队列中。如果消费者的处理时间超过 30 分钟,我希望生产者等到所有项目都处理完毕后再再次轮询其数据源(自 30 分钟过去后它会立即这样做)。

不是在找人为我写代码。一些基本的提示/指导将不胜感激。

这是我正在尝试使用的缩短示例实现。我已经采取了所有可怕的尝试来解决这个问题。请注意,用于构造 ThreadPoolExecutor 的硬编码参数最终将是动态的。

import java.util.concurrent.*;
public class ItemProcessorService {
    public static void main(String args[]) throws InterruptedException {
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        int corePoolSize = 5,
                maxPoolSize = 10,
                keepAlive = 10,
                queueCapacity = 1;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);

        for (int i = 0; i < 10; i++) {
            executor.execute(new ItemConsumer());
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Executor finished");
    }
}

class ItemConsumer implements Runnable {
    @Override
    public void run() {
        processItem();
    }

    private void processItem() {
        try {
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + " - processed item");
        } catch (InterruptedException e) {
            //e.printStackTrace();
        }
    }
}

class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " - rejected");
    }
}
4

3 回答 3

1

为未来的任务寻找线程池执行器。它会让你的生产者在所有线程完成之前等待。下面是示例:

 for (int job = 0; job < yourList.size(); job++) {
        Future<String[]> future = Service.getCompletionService().take();

        if (future.isDone()) {
        }
    }
于 2014-08-22T21:08:12.200 回答
0

使用结合 CountDownLatch的自调度生产者(另请参见此处)。我知道你不需要这些代码,但我认为在这种情况下,有一些代码来试验并没有什么坏处:

import java.util.*;
import java.util.concurrent.*;

public class WaitForConsumers {

public static void main(String[] args) {

    try {
        new WaitForConsumers().demo();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static final int NUMBER_OF_SLEEP_TASKS = 100;

public void demo() throws Exception {

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    Producer p = new Producer(scheduler, 1000L);
    p.start();
    try {
        Thread.sleep(3000);
        p.stop();
        System.out.println("Stopped.");
    } finally {
        scheduler.shutdownNow();
    }
}

public static List<Long> produce() {
    List<Long> items = new ArrayList<Long>();
    for (int i = 0; i < NUMBER_OF_SLEEP_TASKS; i++) items.add(i * 1L);
    return items;
}

public static void consume(List<Runnable> tasks, CountDownLatch consumersRunning) throws Exception {

    ExecutorService executor = Executors.newCachedThreadPool();
    for (Runnable r : tasks) executor.execute(r);
    try {
        consumersRunning.await();
    } finally {
        executor.shutdownNow();
    }
}

class Producer implements Runnable {

    ScheduledExecutorService scheduler;
    long frequencyMs;
    volatile boolean stop;
    ScheduledFuture<?> producerTask;

    Producer(ScheduledExecutorService scheduler, long frequencyMs) {
        this.scheduler = scheduler;
        this.frequencyMs = frequencyMs;
    }

    void start() {
        scheduler.execute(this);
    }

    void stop() {

        System.out.println("Stopping producer.");
        stop = true;
        if (producerTask != null) {
            producerTask.cancel(true);
        }
    }

    @Override public void run() {

        long startTime = System.currentTimeMillis();
        List<Long> items = produce();
        CountDownLatch consumersRunning = new CountDownLatch(items.size());
        List<Runnable> tasks = wrap(items, consumersRunning);
        try {
            System.out.println("Consuming " + tasks.size() + " tasks.");
            consume(tasks, consumersRunning);
            System.out.println("Consumed tasks.");
        } catch (Exception e) {
            e.printStackTrace();
            stop = true;
        } finally {
            if (stop) {
                System.out.println("Producer stopping.");
            } else {
                long waitTime = frequencyMs - (System.currentTimeMillis() - startTime);
                if (waitTime < 1L) {
                    scheduler.execute(this);
                } else {
                    System.out.println("Next run in " + waitTime + " ms.");
                    producerTask = scheduler.schedule(this, waitTime, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    List<Runnable> wrap(List<Long> items, final CountDownLatch consumersRunning) {

        List<Runnable> tasks = new ArrayList<Runnable>();
        for (Long l : items) {
            tasks.add(new SleepTask(l, consumersRunning));
        }
        return tasks;
    }
} // class Producer

class SleepTask implements Runnable {

    long sleepTime;
    CountDownLatch consumersRunning;

    public SleepTask(long sleepTime, CountDownLatch consumersRunning) {
        this.sleepTime = sleepTime;
        this.consumersRunning = consumersRunning;
    }

    @Override public void run() {

        try {
            Thread.sleep(sleepTime);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumersRunning.countDown();
        }
    }
} // class SleepTask
}
于 2014-08-22T21:56:39.733 回答
0

ExecutorService您可以通过为生产者的每次运行创建一个来很好地解决这个问题。生产者创建它,关闭它,并等待它的终止。

要安排生产者,请使用ScheduledExecutorService.scheduleWithFixedDelay(...)ScheduledExecutorService.scheduleAtFixedRate(...),具体取决于您想要的:

  • scheduleWithFixedDelay(...)将在两次运行之间保持指定的时间量,因此无论运行持续多长时间,下一次运行都将在指定的时间量之后进行:

    ...|XXXXXX|.............|XXXXX|.............|XXXXXXXXX|.............|X <--- fix ---> <--- fix ---> <--- fix --->

  • scheduleAtFixedRate(...)试图保持调度率,所以如果生产者需要更长的时间,两次运行之间的时间会减少,但两次运行永远不会重叠:

    ...|XXXXXXXX|...|XXXXX|......|XXXXXXXXXXXXXXX||XXX|....|XX <--- fix ---><--- fix ---><--- fix ---><--- fix ---><-

生产者外观的简单示例:

public class Producer implements Runnable {
    public void run() {
        ExecutorService executor = ... // create new executor

        // queue items
        for (Object item : itemSource) {
            executor.schedule(new Consumer(item));
        }

        // shutdown executor
        executor.shutdown();
        executor.awaitTermination(2, TimeUnit.HOURS);
    }
}

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

scheduler.scheduleWithFixedDelay(new Producer(), 30, 30, TimeUnit.MINUTES);
// or
scheduler.scheduleAtFixedRate(new Producer(), 30, 30, TimeUnit.MINUTES);

当然,您必须进行适当的异常处理,因为生产者运行中的每个未捕获的异常都会阻止调度程序重新调度它。

请注意,创建 executor 可能会很昂贵,因此只有在消耗项目比创建 executor(在您的情况下 - 似乎是这种情况)昂贵得多的情况下,这种方法才合适。

于 2014-08-22T23:32:11.740 回答