0

我正在尝试在OSGi中实现一个服务,它应该等待来自另一个包的传入数据并在接收到数据时处理数据。我正在使用 a LinkedBlockingQueue,因为我不知道我会收到多少个数据包。我的代码如下所示:

public class MyClass {

protected static LinkedBlockingQueue<JSONObject> inputQueue = new LinkedBlockingQueue<JSONObject>();
private ExecutorService workerpool = Executors.newFixedThreadPool(4);

public void startMyBundle() {
    start();
}

protected void start() {
    new Thread(new Runnable() {
        public void run() {
            while(true){
                workerpool.execute(new Runnable() {
                    public void run() {
                        try {
                            process(inputQueue.take());
                        } catch (InterruptedException e) {
                            System.out.println("thread was interrupted.");
                        }
                    }
                });
            }
        }
    }).start();
}

public void transmitIn(JSONObject packet) {
    try {
        inputQueue.put(packet);
    } catch (InterruptedException e) {

    }
}

protected  void process(JSONObject packet) {
    //Some processing
}

当我运行它并且只向服务发送一个数据包时,首先会按原样处理数据包,但随后我的处理器会使用其所有容量,并且大多数时候我OutOfMemoryError看起来像这样:

java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "[Timer] - Periodical Task (Bundle 46) (Bundle 46)"

这可能是什么原因?

4

3 回答 3

1

由于这些代码行,您会遇到内存不足异常:

while(true){
   workerpool.execute(new Runnable() {
   ...

这将永远创建新Runnable实例并将它们添加到线程池的任务队列中。这些进入无界队列并迅速填满内存。

我认为你想要inputQueue.take()一个while (true)循环调用的 4 个线程。

for (int i = 0; i < 4; i++) {
    workerpool.execute(new Runnable() {
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                process(inputQueue.take());
            }
        }
    });
}
// remember to shut the queue down after you've submitted the last job
workerpool.shutdown();

此外,您不需要Thread将任务提交到线程池中。这是一个非阻塞操作,因此可以由调用者直接完成。

于 2013-07-30T14:15:53.137 回答
0

这段代码是罪魁祸首:

protected void start() {
    new Thread(new Runnable() {
        public void run() {
            while(true){
                workerpool.execute(new Runnable() {
                    public void run() {
                        try {
                            process(inputQueue.take());
                        } catch (InterruptedException e) {
                            System.out.println("thread was interrupted.");
                        }
                    }
                });
            }
        }
    }).start();
}

它所做的是创建一个后台任务,将无限数量的任务添加RunnableExecutorService工作队列中。这最终会导致 OOME。

我想你的意思是:

protected void start() {
    for (int i = 0; i < 4; ++i) {
        workerpool.execute(new Runnable() {
            public void run() {
                while (true) {
                    try {
                        process(inputQueue.take());
                    } catch (InterruptedException e) {
                        //killed, exit.
                        return;
                    }
                }
            }
        });
    }
}

ExecutorService即在等待输入的情况下运行 4 个工作人员。

于 2013-07-30T14:18:20.303 回答
0

好的,有点迂腐,但因为这是一个 OSGi 标记的问题......

  1. 清理——你正在创建一个线程和执行器服务,但从不清理它。通常,您需要一对激活/停用方法,并且在停用后不留下任何残留物。从凝聚力的角度来看,您希望在一个对象中看到这一点,而不需要一个中心点来管理它。声明式服务非常适合这种模式。
  2. 共享——一般来说你想和其他人共享一个 Executor,最好从服务注册中心获取一个 Executor。这将允许部署人员根据系统中所有捆绑包的使用情况调整线程数。

还有一件事,鲍里斯给出了一个正确的解决方案,但它不是很有效,因为它总是占用 4 个线程和一个无界的 LinkedQueue。更糟糕的是,代码像服务一样走路,像服务一样说话,但似乎没有被用作服务。我认为我们可以做得更好,因为队列 + 执行器有点加倍,在 OSGi 中这应该是一个服务。

@Component
public class JSONPackageProcessor implement TransmitIn {
  Executor executor;

  @Reference void setExecutor(Executor e) { this.executor = e; }

  public void transmitIn( final JSONPacket packet ) {
    executor.execute(new Runnable() {
       public void run() { 
         try { process(packet); } 
         catch(Throwable t) { log(packet, t); }
       }
    }
  }

  void process( JSONPacket packet ) { ... }
}

process(...)假设总是“很快”结束,这不需要清理。在此模型中,流不会像您对池中的(任意?)4 个工作线程所做的那样受到限制。Executor 的内部队列用于缓冲。您可以将其限制如下:

  Semaphore throttle= new Semaphore(4)

  public void transmitIn( final JSONPacket packet ) throws Exception {
    throttle.acquire();
    executor.execute(new Runnable() {
       public void run() { 
         try { process(packet); } 
         catch(Throwable t) { log(packet, t); }
         finally { throttle.release(); }
    }
  }

您甚至可以很容易地通过 Configuration Admin 进行配置:

 @Activate void configure( Map<String,Object> map) throws Exception {
   if ( map.containsKey("throttle") )
     throttle = new Semaphore( map.get("throttle"));
 }

这段代码的美妙之处在于记录了大多数错误情况,之前/之后的并发关系是正确的,因为您在 OSGi 中获得了保证。这段代码实际上可以按原样工作(不保证某些拼写错误,实际上并没有运行它)。

于 2013-07-31T07:09:10.437 回答