2

在下面的代码中,在极其罕见的情况下(10 亿次执行 QueueThread 对象中的 3 次),如果 block 和 queue.size 原来是 7999,它会达到下面提到的情况。可能的原因是什么。

 if(q.size()<batchsize){   
     System.out.println("queue size" +q.size());   
 }

基本上它无法执行 queue.add 语句,但执行线程中的所有其他语句。

代码片段如下。

import java.util.concurrent.ConcurrentLinkedQueue;   
import java.util.concurrent.ExecutorService;   
import java.util.concurrent.Executors;   
import java.util.concurrent.atomic.AtomicInteger;   

public class CLinkQueueTest {   


    public static final int itersize=100000;   
    public static final int batchsize=8000;   
    public static final int poolsize=100;   

    public static void main (String args[]) throws Exception{   
        int j= 0;   
        ExecutorService service = Executors.newFixedThreadPool(poolsize);   
        AtomicInteger counter = new AtomicInteger(poolsize);   
        ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();   
        String s ="abc";   

        while(j<itersize){   
            int k=0;   
            while(k<batchsize){   
                counter.decrementAndGet();   
                service.submit(new QueueThread(counter, q, s));   
                if(counter.get()<=0){   
                    Thread.sleep(5);   
                }   
                k++;   
            }   
            if(j%20 ==0){   
                System.out.println("Iteration no " + j);   
            }   
            while(counter.get() < poolsize){   
                //wait infinitely   
            }   
            if(q.size()<batchsize){   
                System.out.println("queue size" +q.size());   
            }   
            q.clear();   
            j++;               
        }   

        System.out.println("process complete");   
    }   


import java.util.Queue;   
import java.util.concurrent.Callable;   
import java.util.concurrent.ConcurrentLinkedQueue;   
import java.util.concurrent.atomic.AtomicInteger;   

public class QueueThread implements Callable<Boolean> {   

    private AtomicInteger ai;   
    private Queue<String> qu;   
    private String st;   

    public QueueThread(AtomicInteger i, Queue<String> q, String s){   
        ai = i;   
        qu = q;   
        st = s;        
    }   

    @Override  
    public Boolean call() {   
        try{   
            qu.add(st);            
        } catch(Throwable e){   
            e.printStackTrace();   
        }finally{   
            ai.incrementAndGet();   

        }   
        return true;   
    }   

}  
4

2 回答 2

0

会不会有一次它在队列中注册了一个太少的条目是因为 Executor 没有完成它的处理?

很明显,每次QueueThread.call()调用队列都会添加并AtomicInteger递增。我能想到的只是一个call还没有被执行。

也许你可以通过使用类似的东西对系统更友好一点:

        while(counter.get() < poolsize){   
            //wait infinitely   
            Thread.currentThread().sleep(5);
        }   

但那只是我的个人意见。

于 2013-09-11T12:43:44.563 回答
0

方法文档ConcurrentLinkedQueue.size说:

请注意,与大多数集合不同,此方法不是恒定时间操作。由于这些队列的异步特性,确定当前元素的数量需要 O(n) 遍历。此外,如果在此方法执行期间添加或删除元素,则返回的结果可能不准确。因此,这种方法在并发应用程序中通常不是很有用。

于 2013-09-11T12:43:51.660 回答