我想以多线程的方式阅读10个邮件账户的未读邮件。
但如果线程池大小为 5,则将使用线程池中的 5 个线程。每个线程将读取一个邮件帐户。因此,一旦 Thread_1 读取了第一个邮箱,它就应该读取 mailbox_6。然后线程 2 将读取mailbox_7。
当所有邮件帐户都被读取一次后,循环将从第一个邮件帐户开始。
我们如何在java中做到这一点?
我想以多线程的方式阅读10个邮件账户的未读邮件。
但如果线程池大小为 5,则将使用线程池中的 5 个线程。每个线程将读取一个邮件帐户。因此,一旦 Thread_1 读取了第一个邮箱,它就应该读取 mailbox_6。然后线程 2 将读取mailbox_7。
当所有邮件帐户都被读取一次后,循环将从第一个邮件帐户开始。
我们如何在java中做到这一点?
这应该很容易。您创建一个具有 5 个线程的固定线程池,然后将 10 个作业提交到池中——每个用户电子邮件帐户 1 个:
// create a thread pool with 5 workers
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// submit all 10 user email accounts to the pool to be processed in turn
for (UserEmail userEmail : userEmailsToProcess) {
threadPool.submit(new EmailProcessor(userEmail));
}
// once we have submitted all jobs to the thread pool, it should be shutdown
threadPool.shutdown();
...
// here's our runnable class which does the actual work
public class EmailProcessor implements Runnable {
private UserEmail userEmail;
public MyJobProcessor(UserEmail userEmail) {
this.userEmail = userEmail;
}
public void run() {
// read the user email
...
}
}
该类UserEmail
可以保存要“阅读”的电子邮件的文件名,或者可能是帐户名或其他内容。这将由您决定如何表示邮件帐户和要阅读的邮件。
[[来自评论:]]
我有 10 个邮箱,比如..mailbox1...mailbox10,现在我有 5 个线程池中的线程,所以 thread1 将获取任何邮箱,所以假设它会选择 mailbox1,然后 thread2 会选择 mailbox2,thread3 会选择 mailbox3,thread4 会选择邮箱 4 和线程 5 将选择邮箱 5,现在当线程 1(已定义特定时间段)空闲时,应该选择邮箱 6 - 邮箱 10,任何尚未阅读的人不应从邮箱 1 - 邮箱 5 中选择任何邮箱,最多所有尚未阅读的邮箱。
我懂了。一种解决方案是让调度线程每隔一段时间就休眠和唤醒以查看邮箱是否有任何邮件。如果他们这样做,那么它会将作业提交给要读取该邮箱的线程池。一旦邮箱被读取,线程就会返回并要求处理下一个邮箱。调度线程将继续向线程池添加邮箱,直到被告知停止。
如果在 an 中有很多上下文,EmailProcessor
那么您可以拥有线程池,但他们可以从 aBlockingQueue<File>
或其他东西中消费,告诉他们哪些邮箱需要注意。
执行器可能不是这里的最佳解决方案,但为简单起见,我将使用 Gray 代码的变体。为了连续扫描您的 10 个邮箱,您可以执行以下操作,尽管您必须添加一些代码来处理干净终止:
// Create one semaphore per mailbox
Semaphore semaphores[] = new Semaphore[10]
for (int s = 0; s < semaphores.length; s ++) {
semaphores[s] = new Semaphore(1);
}
// create a thread pool with 5 workers
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// submit all 10 user email accounts to the pool to be processed in turn
for (int i = 0; i < 5; i ++) {
threadPool.submit(userEmailsToProcess, semaphores);
}
// once we have submitted all jobs to the thread pool, it should be shutdown
threadPool.shutdown();
...
// here's our runnable class which does the actual work
public class EmailProcessor implements Runnable {
private UserEmail userEmailToProcess[];
private Semaphore semaphores[];
public MyJobProcessor(UserEmail userEmailToProcess[], Semaphore semaphores[]) {
this.userEmailsToProcess = userEmailToProcess;
this.semaphores = semaphores;
}
public void run() {
while (true) { // you could use a semaphore here to test program termination instead
for (int s = 0; s < semaphores.size; s ++) {
if (semaphores[s].tryAcquire()) {
UserEmail email = userEmailToProcess[s];
// read the user email
…
semaphores[s].release();
}
}
}
}
}
这是一个快速而肮脏的解决方案,虽然不是 100% 公平,但它适用于任意数量的线程和邮箱。在您有 10 封电子邮件和 5 个工作人员的特殊情况下,您可以让每个线程连续扫描邮箱的子集,即线程 1 检查邮箱 1 然后邮箱 2,线程 2 检查邮箱 3 然后邮箱 4,...没有争议
如何简单地创建任务的并发(或同步,在这种情况下并不重要)集合(可能是队列)。每个任务将从电子邮件帐户加载所需的数据。然后让每个线程从集合中获取一个任务,直到它为空。
每个线程都将引用此集合并对其执行循环。当集合不为空时,从中取出一个任务并处理它。
跟踪阅读的电子邮件帐户。例如定义这样的东西,
//total number of email accounts that need to be read.
private int noOfEmails=10;
//the thread pool that is used to read the emails
private ExecutorService threadPool = Executors.newFixedThreadPool(5);
//the tracker array that keeps track of the emails that
//are already read. This array is cleared only after all 10
//emails are read.
private ArrayList<String> emailTracker=new ArrayList<String>(noOfEmails);
//any changes to emailTracker should be synchronized as
//this is a common data shared between all 5 threads.
private Object syncObject=new Object();
在 Runnable 实现中,检查 emailTracker 是否包含您的电子邮件帐户标识符,如果是,则表明它已被读取,因此返回并等待 emailTracker 被清除。当所有 10 个电子邮件帐户都被读取时,它将被清除。
if(emailTracker.contains(email.identifier))
{
return;
}
//read email.
email.read();
//simple synchronization.
synchronized (syncObject)
{
//read email
emailTracker.add(email.identifier);
//if all emails are read, clear the tracker
//This will allow reading of all the emails
//once again.
if(emailTracker.size()==noOfEmails)
{
emailTracker.clear();
}
}
Sanju I believe you want to execute them in the following round-robin fashion:
Thread1: 1,6 and so on Thread2: 2,7 Thread3: 3,8 Thread4: 4,9 Thread5: 5,10
First of all this is not what threads are meant for if you want to execute them in a sequence like that. Second I dont think it is possible with thread pool. If you still want them then here is my solution for executing threads in round-robin fashion which you may alter based on your need: public class EmailRoundRobin { public Object[] locks;
private static class EmailProcessor implements Runnable {
private final Object currentLock;
private final Object nextLock;
private UserEmail userEmail;
public EmailProcessor (UserEmail userEmail,, Object currentLock, Object nextLock) {
this.userEmail = userEmail;
this.currentLock = currentLock;
this.nextLock = nextLock;
}
@Override
public void run() {
try {
work = //reading email
while ( work != null) {
try {
currentLock.wait();
// Do your work here.
}
catch(InterruptedException e) {}
synchronized(nextLock) {
nextLock.notify();
}
}//while ends
} catch (IOException e) {
e.printStackTrace();
}
synchronized(nextLock) {
nextLock.notify(); /// Ensures all threads exit at the end
}
}
public EmailRoundRobin(int numberOfAccountsToRead) {
locks = new Object[numberOfAccountsToRead];
//Initialize lock instances in array.
for(i = 0; i < numberOfAccountsToRead; ++i) locks[i] = new Object();
//Create threads
int j;
for(j=0; j<(numberOfAccountsToRead-1); j++ ){
Thread linePrinterThread = new Thread(new EmailProcessor(emailInfo + "Temp" + j,locks[j],locks[j+1]));
linePrinterThread.start();
}
Thread lastLinePrinterThread = new Thread(new EmailProcessor(emailInfo + "Temp" + j,locks[numberOfFilesToRead-1],locks[0]));
lastLinePrinterThread.start();
}
public void startProcessing() {
synchronized (locks[0]) {
locks[0].notify();
}
}
public static void main(String[] args) {
EmailRoundRobin emailRoundRobin = new EmailRoundRobin(4);
emailRoundRobin.startPrinting();
}
}
I have taken this from my answer to [this question] (Running threads in round robin fashion in java)! which had similar requirement. There is an option using Phaser also mentioned in that answer as well.
我自己得到了答案,它非常简单,我使用了 ExecutorService 并且它只由它管理。