在这种情况下(任何版本),我试图找出 Java 中的最佳方法来实现单个生产者多个消费者,其中我使用 ExecutorService(根据偏好,不需要)生产者需要“永远”运行但每次它运行时,它需要等待所有内容都被完全处理完,例如所有消费者线程都已终止,队列为空,并且没有剩余的项目要生产。生产者也应该只以固定的时间间隔轮询其数据源。
举个例子:我希望我的生产者每 30 分钟轮询一次它的数据源以获取记录并将它们提交到队列中。如果消费者的处理时间超过 30 分钟,我希望生产者等到所有项目都处理完毕后再再次轮询其数据源(自 30 分钟过去后它会立即这样做)。
不是在找人为我写代码。一些基本的提示/指导将不胜感激。
这是我正在尝试使用的缩短示例实现。我已经采取了所有可怕的尝试来解决这个问题。请注意,用于构造 ThreadPoolExecutor 的硬编码参数最终将是动态的。
import java.util.concurrent.*;
public class ItemProcessorService {
public static void main(String args[]) throws InterruptedException {
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
int corePoolSize = 5,
maxPoolSize = 10,
keepAlive = 10,
queueCapacity = 1;
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);
for (int i = 0; i < 10; i++) {
executor.execute(new ItemConsumer());
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Executor finished");
}
}
class ItemConsumer implements Runnable {
@Override
public void run() {
processItem();
}
private void processItem() {
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " - processed item");
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " - rejected");
}
}