0

我的主类,根据一些规则生成多个线程。(20-40 个线程可以长时间存活)。每个线程创建几个线程(短时间)-> 我正在为此使用执行器。我需要在短时间线程中处理多维数组-->我像在下面的代码中那样编写它-->但我认为它效率不高,因为我将它多次传递给这么多线程/任务--. 我试图直接从线程中访问它(通过将其声明为公共 --> 不成功) --> 将很乐意获得有关如何改进它的评论/建议。我还研究了下一步以返回一个一维数组作为结果(在 Assetfactory 类中更新它可能会更好)--> 我不知道该怎么做。请看下面的代码。谢谢帕兹

import java.util.concurrent.*;
import java.util.logging.Level;

public class AssetFactory implements Runnable{
    private volatile boolean stop = false;
    private volatile String feed ;
    private double[][][] PeriodRates= new double[10][500][4];

    private String TimeStr,Bid,periodicalRateIndicator;
    private final BlockingQueue<String> workQueue;
    ExecutorService IndicatorPool = Executors.newCachedThreadPool();

    public AssetFactory(BlockingQueue<String> workQueue) {
      this.workQueue = workQueue;
    }   

    @Override
    public void run(){
      while (!stop) {
       try{   
          feed = workQueue.take(); 
            periodicalRateIndicator = CheckPeriod(TimeStr, Bid) ;
            if (periodicalRateIndicator.length() >0) {
                IndicatorPool.submit(new CalcMvg(periodicalRateIndicator,PeriodRates));
            } 

          } 

          if ("Stop".equals(feed)) {
              stop = true ;
          }

       } // try
       catch (InterruptedException ex) {
           logger.log(Level.SEVERE, null, ex);
           stop = true;
       }


      } // while
    } // run  

这是 CalcMVG 类

public class CalcMvg implements Runnable {
    private double [][][] PeriodRates = new double[10][500][4];

    public CalcMvg(String Periods, double[][][] PeriodRates) {
        System.out.println(Periods); 
        this.PeriodRates = PeriodRates ;
    }

    @Override
    public void run(){
       try{
          // do some work with the data of PeriodRates array e.g. print it (no changes to array
          System.out.println(PeriodRates[1][1][1]);
          }
       catch (Exception ex){
          System.out.println(Thread.currentThread().getName() + ex.getMessage());
          logger.log(Level.SEVERE, null, ex);
         }
      }//run

  } // mvg class
4

2 回答 2

4

这里发生了几件事似乎是错误的,但是由于提供的代码数量有限,很难给出一个好的答案。

首先是实际的编码问题:

  • 如果只有一个线程访问它(停止,馈送),则无需将变量定义为 volatile

  • 您应该在该函数中本地声明仅在本地上下文(运行方法)中使用的变量,而不是全局声明整个实例(几乎所有变量)。这允许 JIT 进行各种优化。

  • InterruptedException 应该终止线程。因为它是作为终止线程工作的请求而抛出的。

  • 在您的代码示例中,workQueue 似乎什么也没做,只是让线程进入睡眠状态或停止它们。为什么它不立即为实际的工作线程提供所需的工作量?

然后是代码结构问题:

  • 您使用线程来为线程提供工作。这是低效的,因为您只有有限数量的内核可以实际完成工作。由于线程的执行顺序是未定义的,因此 IndicatorPool 可能大部分处于空闲状态或被尚未完成的任务过度填充。

  • 如果您有有限的工作要做,ExecutorCompletionService可能对您的任务有所帮助。

我认为您将通过重新设计代码结构获得最佳的速度提升。想象一下(假设我正确理解了您的问题):

  • 有一个由某些数据源(例如文件流、网络)提供的任务阻塞队列。

  • 一组等于核心数量的工作线程正在该数据源上等待输入,然后对其进行处理并放入完成队列。

  • 一个特定的数据集是您工作的“终结者”(例如“null”)。如果一个线程遇到这个终止符,它会完成它的循环并关闭。

现在,对于此构造,以下内容适用:

案例一:数据源是瓶颈。它不能通过使用多个线程来加速,因为如果您更频繁地询问,您的硬盘/网络将无法更快地工作。

案例 2:您机器上的处理能力是瓶颈,因为您处理的数据不能超过机器上的工作线程/内核可以处理的数据。

在这两种情况下,结论都是,工作线程需要在准备好处理新数据后立即寻找新数据。因为要么他们需要被搁置,要么他们需要限制传入的数据。这将确保最大吞吐量。

如果所有工作线程都已终止,则工作完成。这可以通过使用CyclicBarrierPhaser类来跟踪 iE。

工作线程的伪代码:

public void run() {
  DataType e;
  try {
    while ((e = dataSource.next()) != null) {
      process(e);
    }
    barrier.await();
  } catch (InterruptedException ex) {
  }
}

我希望这对您的情况有所帮助。

于 2013-10-13T12:13:03.900 回答
0

将数组作为参数传递给构造函数是一种合理的方法,尽管除非您打算复制数组,否则不必使用大数组初始化 PeriodRates。分配一大块内存然后立即在构造函数中重新分配其唯一引用似乎很浪费。我会这样初始化它:

private final double [][][] PeriodRates;

public CalcMvg(String Periods, double[][][] PeriodRates) {
    System.out.println(Periods); 
    this.PeriodRates = PeriodRates;
}

另一种选择是将 CalcMvg 定义为 AssetFactory 的内部类,并将 PeriodRate 声明为 final。这将允许 CalcMvg 的实例访问 AssetFactory 的外部实例中的 PeriodRate。

返回结果比较困难,因为它涉及跨线程发布结果。一种方法是使用同步方法:

private double[] result = null;

private synchronized void setResult(double[] result) {
    this.result = result;
}

public synchronized double[] getResult() {
    if (result == null) {
        throw new RuntimeException("Result has not been initialized for this instance: " + this);
    }

    return result;
}  

Java 库中提供了更高级的多线程概念,例如Future,在这种情况下可能是合适的。

关于您对线程数量的担忧,允许库类管理线程池的工作分配可能会解决这个问题。像Executor这样的东西可能会对此有所帮助。

于 2013-10-13T12:17:13.297 回答