2

我想我做错了。我正在创建假设从共享队列中处理一些数据的线程。我的问题是程序很慢而且内存占用很大,我怀疑队列可能不像我希望的那样共享。我怀疑这是因为在我的代码中我添加了一条显示队列大小的行,如果我启动 2 个线程,那么我会得到两个具有完全不同数字的输出,并且似乎会自行增加(我认为它可能是相同的数字,但也许它从 100 跳到 2 等等,但在观看后显示 105 和 5 并且以不同的速率运行。如果我有 4 个线程,那么我会看到 4 个不同的数字)。

这是相关部分的片段。我在程序顶部的队列中创建了一个带有我想要的数据的静态类

static class queue_class {
        int number;
        int[] data;
        Context(int number,  int[] data) {
            this.number = number;
            this.data = data;
        }
    }

然后我在向可调用对象发送一些作业后创建队列..

static class process_threaded implements Callable<Void> {
    // queue with contexts to process
    private Queue<queue_class> queue;

    process_threaded(queue_class request) {
        queue = new ArrayDeque<queue_class>();
        queue.add(request);
    }

    public Void call() {
        while(!queue.isEmpty()) {
            System.out.println("in contexts queue with a size of " + queue.size());
            Context current = contexts.poll();
            //get work and process it, if it work great then the solution goes elsewhere
            //otherwise, depending on the data, its either discarded or parts of it is added back to queue
            queue.add(new queue_class(k, data_list)); 

如您所见,数据有 3 个选项,如果数据良好则发送出去,如果数据非常糟糕则丢弃或发送回队列。我认为队列在被发回时正在运行,但我怀疑是因为每个线程都在自己的队列上工作,而不是共享队列。

这个猜测正确吗?我做错了吗?

4

2 回答 2

2

您的评估是正确的,每个线程(可能)都在使用自己的队列,因为您在Callable. (实际上有一个很奇怪Callable<Void>——那不就是一个Runnable吗?)

还有其他问题,例如,您使用的队列不是线程安全的,或者您的代码在编写时无法编译。

但是,重要的问题是,您真的需要首先显式创建队列吗?为什么不ExecutorService给你提交你Callable的s(或者Runnables如果你决定进行那个切换):将对执行者的引用传递到你Callable的s中,他们可以将新Callable的s添加到执行者的任务队列中以运行。无需重新发明轮子。

例如:

static class process_threaded implements Runnable {
    // Reference to an executor
    private final ExecutorService exec;
    // Reference to the job counter
    private final AtomicInteger jobCounter;
    // Request to process
    private queue_class request;

    process_threaded( ExecutorService exec, AtomicInteger counter, queue_class request) {
        this.exec = exec;
        this.jobCounter = counter;
        this.jobCounter.incrementAndGet(); // Assuming that you will always
                                           // submit the process_threaded to
                                           // the executor if you create it.
        this.request = request;
    }

    public run() {
        //get work and process **request**, if it work great then the solution goes elsewhere
        //otherwise, depending on the data, its either discarded or parts of are added back to the executor
        exec.submit( new process_threaded( exec, new queue_class(k, data_list) ) );

        // Can do some more work

        // Always run before returning: counter update and notify the launcher
        synchronized(jobCounter){
            jobCounter.decrementAndGet();
            jobCounter.notifyAll();
        }
    }
}

编辑:

为了解决您何时关闭执行程序的问题,我认为最简单的解决方案是有一个作业计数器,并在达到 0 时关闭。对于线程安全AtomicInteger来说,可能是最好的选择。我在上面添加了一些代码来合并更改。然后您的启动代码将如下所示:

void theLauncher() {

    AtomicInteger jobCounter = new AtomicInteger( 0 );

    ExecutorService exec = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcesses());

    exec.submit( new process_threaded( exec, jobCounter, someProcessRequest ) );
    // Can submit some other things here of course...

    // Wait for jobs to complete:
    for(;;jobCounter.get() > 0){
        synchronized( jobCounter ){ // (I'm not sure if you have to have the synchronized block, but I think this is safer.
            if( jobCounter.get() > 0 )
                jobCounter.wait();
        }
    }

    // Now you can shutdown:
    exec.shutdown();
}
于 2012-04-14T04:31:06.607 回答
2

不要重新发明轮子!使用ConcurrentLinkedQueue怎么样?从javadocs:

基于链接节点的无界线程安全队列。此队列对元素进行 FIFO(先进先出)排序。队列的头部是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新元素被插入到队列的尾部,队列检索操作获取队列头部的元素。当许多线程将共享对公共集合的访问时,ConcurrentLinkedQueue 是一个合适的选择。

于 2012-04-14T04:45:29.133 回答