1

这是我的应用程序的简化版本,显示了我在做什么。

/*
in my app's main():

    Runner run = new Runner();

    run.dowork();

*/

class Runner
{
    private int totalWorkers = 2;
    private int workersDone = 0;

    public synchronized void workerDone()
    {
        workersDone++;
        notifyAll();
    }

    public synchronized void dowork()
    {
        workersDone = 0;

        //<code for opening a file here, other setup here, etc>

        Worker a = new Worker(this);
        Worker b = new Worker(this);

        while ((line = reader.readLine()) != null)
        {
            //<a large amount of processing on 'line'>

            a.setData(line);
            b.setData(line);

            while (workersDone < totalWorkers)
            {
                wait();
            }               
        }
    }
}

class Worker implements Runnable
{
    private Runner runner;
    private String data;

    public Worker(Runner r)
    {
        this.runner = r;
        Thread t = new Thread(this);
        t.start();
    }

    public synchronized void setData(String s)
    {
        this.data = s;
        notifyAll();
    }

    public void run
    {
        while (true)
        {
            synchronized(this)
            {
                wait();

                //<do work with this.data here>

                this.runner.workerDone();
            }
        }
    }
}

这里的基本概念是我有一群工作人员,他们都独立地对传入的数据行进行一些处理,并在他们喜欢的任何地方写出数据 - 他们不需要将任何数据报告回主线程或彼此共享数据。

我遇到的问题是这段代码死锁了。我正在阅读一个超过 100 万行的文件,我很幸运在我的应用程序停止响应之前获得了 100 行。

实际上,工人们都在做不同数量的工作,所以我想等到他们都完成后再去下一条生产线。

我不能让工作人员以不同的速度处理并在内部对数据进行排队,因为我正在处理的文件太大而无法放入内存。

我不能给每个工人自己的 FileReader 来独立获取“线”,因为我在工人看到它之前就在线上进行了大量处理,并且不想在每个工人中重新进行处理。

我知道我在 Java 中缺少一些相当简单的同步方面,但我被困在这一点上。如果有人能解释我在这里做错了什么,我将不胜感激。我相信我误解了同步的某些方面,但我没有尝试修复它的想法。

4

2 回答 2

3

直接使用synchronized,wait()notify()绝对是棘手的。

幸运的是,Java 并发 API为这类事情提供了一些非常直观的控制对象。特别是看CyclicBarrierand CountDownLatch; 其中之一几乎肯定会是您正在寻找的东西。

对于这种情况,您可能还会发现一个ThreadPoolExecutor很方便的。

这是您的代码段的一个简单示例/转换,它产生以下输出(当然没有死锁):

读取线:1号线
等待工作完成在线:1号
线 在线工作:1号线 在线
工作:1号线
读取线:2号线 在线
等待工作完成:2号线 在线
工作:2号线
在线工作:第 2
行 读取行:第 3 行
等待在线工作完成:第 3
行 在线工作:第 3 行 在线
工作:第 3 行
全部工作完成!

public class Runner
{

    public static void main(String args[]) {
        Runner r = new Runner();
        try {
            r.dowork();
        } catch (IOException e) {
            // handle
            e.printStackTrace();
        }
    }

    CyclicBarrier barrier;
    ExecutorService executor;
    private int totalWorkers = 2;

    public Runner() {
        this.barrier = new CyclicBarrier(this.totalWorkers + 1);
        this.executor = Executors.newFixedThreadPool(this.totalWorkers);
    }

    public synchronized void dowork() throws IOException
    {
        //<code for opening a file here, other setup here, etc>
        //BufferedReader reader = null;
        //String line;

        final Worker worker = new Worker();

        for(String line : new String[]{"Line 1", "Line 2", "Line 3"})
        //while ((line = reader.readLine()) != null)
        {
            System.out.println("Read line: " + line);
            //<a large amount of processing on 'line'>

            for(int c = 0; c < this.totalWorkers; c++) {
                final String curLine = line;
                this.executor.submit(new Runnable() {
                    public void run() {
                        worker.doWork(curLine);
                    }
                });
            }

            try {
                System.out.println("Waiting for work to be complete on line: " + line);
                this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }

        System.out.println("All work complete!");
    }

    class Worker
    {
        public void doWork(String line)
        {
            //<do work with this.data here>
            System.out.println("Working on line: " + line);

            try {
                Runner.this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }
    }    
}
于 2008-12-08T21:23:38.427 回答
0

恕我直言,您错误地放置了“workersDone = 0”。

public synchronized void dowork()
        {
                // workersDone = 0;

                //<code for opening a file here, other setup here, etc>

                Worker a = new Worker(this);
                Worker b = new Worker(this);

                while ((line = reader.readLine()) != null)
                {
                        workersDone = 0;

                        //<a large amount of processing on 'line'>

                        a.setData(line);
                        b.setData(line);

                        while (workersDone < totalWorkers)
                        {
                                wait();
                        }                               
                }
        }
于 2008-12-08T21:23:15.043 回答