1

我没有太多制作多线程应用程序的经验,但我觉得我的程序正处于可以从拥有多个线程中受益的地步。我正在做一个更大规模的项目,其中涉及使用分类器(如机器学习)对大约 32000 个客户进行分类。我调试了程序,发现对每个用户进行分类大约需要一秒钟。所以换句话说,这需要 8.8 小时才能完成!

有什么方法可以运行 4 个线程,每个线程处理 8000 个用户?第一个线程将处理 1-8000,第二个 8001-16000,第三个 16001-23000,第四个 23001-32000。此外,到目前为止,每个分类都是通过调用另一个类的静态函数来完成的......

然后当除主线程之外的其他线程应该结束时。这样的事情可行吗?如果是这样,如果有人可以提供有关如何执行此操作的提示或步骤,我将不胜感激。我熟悉关键部分(等待/信号)的概念,但对它没有什么经验。

再次,非常感谢任何帮助!欢迎提供有关如何处理此类情况的提示和建议!不确定这是否重要,但我有一台处理器速度为 2.53 GHZ 的 Core 2 Duo PC。

4

4 回答 4

2

这对于 Apache Hadoop 来说太轻量级了,它每台服务器需要大约 64MB 的数据块......但是......对于 Akka Actors 来说这是一个完美的机会,而且它恰好支持 Java!

http://doc.akka.io/docs/akka/2.1.4/java/untyped-actors.html

基本上,你可以让 4 个参与者来做这项工作,当他们完成对用户的分类,或者可能更好的是,一些用户,他们要么将其传递给“接收者”参与者,后者将信息放入数据结构或文件中对于输出,或者,您可以通过让每个文件自己写入文件来进行并发 I/O。然后可以在文件全部完成后检查/组合文件。

如果你想变得更花哨/更强大,你可以把演员放在远程服务器上。与他们沟通仍然很容易,而且您将利用多台服务器的 CPU/资源。

我自己写了一篇关于 Akka 演员的文章,但它是在 Scala 中的,所以我就不说了。但是如果你在谷歌上搜索“akka actor”,你会得到很多关于如何使用它的手把手的例子。勇敢一点,直接潜入并进行实验。“演员系统”是一个很容易理解的概念。我知道你可以做到这一点!

于 2013-10-25T02:32:36.620 回答
1

如果将输入数组拆分为 4 个相等的子数组用于 4 个线程,则不能保证所有线程同时完成。您最好将所有数据放在一个队列中,并让所有工作线程从该公共队列中提供数据。使用ad-safe BlockingQueue 实现,以便不编写低级同步/等待/通知代码。

于 2013-10-25T06:24:22.427 回答
1

将数据拆分为实现 Runnable 的对象,然后将它们传递给新线程。

在这种情况下拥有四个以上的线程不会杀死你,但你不能获得比核心更多的并行工作(如评论中所述) - 如果线程多于核心,系统将不得不处理谁去什么时候。

如果我有一个班级客户,并且我想发布一个线程来优先考虑更大集合中的 8000 个客户,我可能会执行以下操作:

public class CustomerClassifier implements Runnable {

  private customer[] customers;

  public CustomerClassifier(customer[] customers) {
     this.customers = customers;
  }
  @Override
  public void run() {
    for (int i=0; i< customers.length; i++) {
      classify(customer);//critical that this classify function does not
                         //attempt to modify a resource outside this class
                         //unless it handles locking, or is talking to a database
                         //or something that won't throw fits about resource locking
    }
  }  
}

然后在其他地方发布这些线程

int jobSize = 8000;
customer[] customers = new customer[jobSize]();
int j = 0;
for (int i =0; i+j< fullCustomerArray.length; i++) {
  if (i == jobSize-1) {
    new Thread(new CustomerClassifier(customers)).start();//run will be invoked by thread
    customers = new Customer[jobSize]();
    j += i;
    i = 0;
  }
  customers[i] = fullCustomerArray[i+j];
}

如果您的分类方法在某处影响相同的资源,则您将不得不实施锁定,并且还会在某种程度上扼杀您获得的优势。

并发非常复杂,需要很多思考,我还建议查看 oracle 文档http://docs.oracle.com/javase/tutorial/essential/concurrency/index.html (我知道链接不好,但希望oracle 文档不会移动太多?)

免责声明:我不是并发设计或多线程(不同主题)方面的专家。

于 2013-10-25T03:01:20.463 回答
0

从 java 6 开始,我们有了一些方便的并发工具。您可能需要考虑使用线程池来实现更简洁的实现。

package com.threads;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ParalleliseArrayConsumption {

    private int[] itemsToBeProcessed ;

    public ParalleliseArrayConsumption(int size){
        itemsToBeProcessed = new int[size];
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        (new ParalleliseArrayConsumption(32)).processUsers(4);

    }

    public void processUsers(int numOfWorkerThreads){
         ExecutorService threadPool = Executors.newFixedThreadPool(numOfWorkerThreads);
         int chunk = itemsToBeProcessed.length/numOfWorkerThreads;
         int start = 0;
         List<Future> tasks = new ArrayList<Future>();
         for(int i=0;i<numOfWorkerThreads;i++){
             tasks.add(threadPool.submit(new WorkerThread(start, start+chunk)));
             start = start+chunk;
         }
             // join all worker threads to main thread
         for(Future f:tasks){
             try {
                f.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
         }

        threadPool.shutdown();
        while(!threadPool.isTerminated()){
        }

    }

    private class WorkerThread implements Callable{

        private int startIndex;
        private int endIndex;

        public WorkerThread(int startIndex, int endIndex){
            this.startIndex = startIndex;
            this.endIndex = endIndex;
        }

        @Override
        public Object call() throws Exception {
            for(int currentUserIndex = startIndex;currentUserIndex<endIndex;currentUserIndex++){
                // process the user. Add your logic here
                System.out.println(currentUserIndex+" is the user being processed in thread " +Thread.currentThread().getName());
            }
            return null;
        }       

    }

}
于 2013-10-28T23:00:53.213 回答