我想在Java中实现类,它将等待来自不同线程的新数据,当他得到它时,这个类将处理它并再次等待新数据。我想只使用synchronized、wait、notifyAll命令来实现这一点。我尝试了一些变体:
1) 使用一个线程,通过命令 lockObject.wait() 等待。但是当所有活动线程完成工作时,该线程将永远等待。当然,我可以创建方法 stopProcess(),但它不安全,因为另一个程序员可能会忘记调用它。
2)使用一个守护线程,它不会工作,因为当所有活动线程完成他们的工作时,我的守护线程死了,但他可以有一些他必须处理的数据
3)当新数据到来时 - 创建新线程,它将处理数据。当线程处于活动状态时(他处理给定的数据),他将接收新数据。当没有数据到来并且所有旧数据都被处理时,线程完成工作。这个变体的减号是 - 当数据通过某个时期(当线程有时间处理旧数据并死亡时),将创建一个新线程。我认为这对性能或/和内存不利。我对吗?
是否可以只使用一个或两个(可能结合使用守护进程和活动线程)线程而不使用 stopProcess() 方法来解决我的问题?
这里有一些代码
我对阻塞队列的认识
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
public void add(T el){
synchronized (queue){
queue.add(el);
}
}
public T getFirst(){
synchronized (queue){
return queue.poll();
}
}
public int getSize(){
synchronized (queue){
return queue.size();
}
}
}
数据类
public class Data {
//some data
public void process(){
//process this data
}
}
代码的第一个变体
public class ProcessData {
private BlockingQueue<Data> queue = new BlockingQueue<Data>();
private boolean run = false;
private Thread processThread;
private Object lock = new Object();
public synchronized void addData(Data data) throws Exception {
if (run){
if (data != null){
queue.add(data);
wakeUpToProcess();
}
}else{
throw new Exception("");
}
}
public synchronized void start() {
if (!run){
run = true;
processThread = new Thread(new Runnable() {
public void run() {
while (run || queue.getSize()!=0){
while(queue.getSize() == 0 && run){
//if stopProcess was not called
//and no active threads
//it will not die
waitForNewData();
}
Data cur;
while(queue.getSize() > 0){
cur = queue.getFirst();
cur.process();
}
}
}
});
processThread.start();
}
}
public synchronized void stopProcess() {
if (run){
run = false;
wakeUpToProcess();
}
}
private void waitForNewData(){
try{
synchronized (lock){
lock.wait();
}
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
private void wakeUpToProcess(){
synchronized (lock){
lock.notifyAll();
}
}
}
在第二个变体中,我将 processThread 作为守护进程。但是当活动线程死亡时,processThread 完成工作,但队列中有一些数据,我必须处理。
第三种变体
public class ProcessData {
private BlockingQueue<Data> queue = new BlockingQueue<Data>();
private boolean run = false;
private Thread processThread = null;
public synchronized void addData(Data data) throws Exception {
if (run){
if (data != null){
queue.add(data);
wakeExecutor();
}
}else{
throw new Exception("ProcessData is stopped!");
}
}
public synchronized void start() {
if (!run){
run = true;
}
}
public synchronized void stopProcess() {
if (run){
run = false;
}
}
public boolean isRunning(){
return this.run;
}
protected void wakeExecutor(){
if (processThread ==null || !processThread.isAlive()){
processThread = new Thread(new Runnable() {
@Override
public void run() {
Data cur;
while(queue.getSize() > 0){
cur = queue.getFirst();
cur.process();
}
}
});
processThread.start();
}
}
}
重要的是,数据必须按照来自线程的顺序进行处理。