2

在此代码段中,我创建了一个管道并在一端附加一个 Scanner,在另一端附加一个 PrintStream,以便在多个消费者和生产者线程之间进行通信。然后我创建并启动三个线程:

  1. 第一个线程是消费者线程。它检查扫描器以查看是否有一行文本可供使用,使用它,打印到标准输出,然后休眠几毫秒,然后重复。如果没有什么可消费的,那么它会打印一条关于它的消息,休眠并重复。

  2. 此代码段中的第二个线程什么也不做。更多关于下面的内容。

2.5 在第 3 个线程启动之前有 3 秒的延迟。

  1. 第三个线程是生产者,只为第一个线程生成文本消息以供消费。它产生一条消息,睡觉

public static void main(String[] args) throws IOException
{
    PipedInputStream pis = new PipedInputStream();
    PipedOutputStream pos = new PipedOutputStream(pis);
    Scanner scan = new Scanner(pis);
    PrintStream ps = new PrintStream(pos);

    new Thread()
    {
        public void run()
        {
            int x = 0;
            while (true)
            {
                x++;
                if (scan.hasNextLine())
                {
                    System.out.println("pulled: " + scan.nextLine());
                } else
                {
                    if (x % 100 == 0)
                    {
                        System.out.println("no data to pull");
                    }
                }
                try
                {
                    sleep(10);
                } catch (InterruptedException ex) { }
            }
        }
    }.start();

    new Thread()
    {
        public void run()
        {
        }
    }.start();

    try
    {
        sleep(3000);
    } catch (InterruptedException ex) { }

    new Thread()
    {
        public void run()
        {
            int x = 0;
            while (true)
            {
                x++;
                ps.println("hello: " + x);
                try
                {
                    sleep(1000);
                } catch (InterruptedException ex) {}
            }
        }
    }.start();
}

输出(如我所料):

pulled: hello: 1
pulled: hello: 2
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6

还要注意 scan.nextLine() 是阻塞的(因为没有消息表明没有数据可用......数据总是“可用”,即使它“在路上”)。

现在,如果我用一些代码替换第二个线程的主体,该代码会产生一些文本供第一个线程使用:

new Thread()
{
    public void run()
    {
        ps.println( "Interfere");
    }
}.start();

然后我开始触发第一个线程的无数据子句:

pulled: Interfere
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull

因此,如果第二个线程开始使用 PrintStream 对象来生成消息,那么管道中就会出现问题,并且消费者线程将无法在另一端找到消息。

现在事情变得更奇怪了。如果我阻止第二个线程完成,例如将其放入一个非常长的循环中,则管道不会被粘住:

new Thread()
{
    public void run()
    {
        ps.println("interfere");
        for ( long i = 0; i < 10000000000L; i++ );
        System.out.println("done interfering" );
    }
}.start();

输出:

pulled: interfere
pulled: hello: 1
pulled: hello: 2
done interfering
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6

所以我认为如果第二个线程在第三个线程开始生产之前终止,那么第一个线程将永远不会从第三个线程获得任何消息。但是,如果第二个线程设法挂起直到第三个线程开始生产,那么一切都会按预期进行。

这里发生了什么?第二个线程在终止时是否正在关闭管道/流(或对管道/流执行其他操作)?如果是这样,为什么?如果第三个线程在第二个线程终止之前开始使用管道/流,为什么它似乎没有关闭(或对管道/流执行任何操作)?当第二个线程产生消息并在第三个线程启动之前终止时,有没有办法使此代码按预期“工作”(即第一个线程消耗任何一个/两个生产者线程产生的任何东西)?

背景:这是系统基本组件的缩影,其中多个客户端将使用来自单个生产者线程的消息。但是,在所有客户端线程都发出信号表明它们已准备好之前,无法启动生产者线程。对于每个客户端线程,都有另一个线程查询它们是否准备好。一旦所有客户端线程都发出信号表明它们已准备就绪,就会启动生产者线程。我试图让线程通过流进行通信,以便稍后我可以将它们分布在多台计算机上,并使用套接字设置管道,而对底层代码的更改最少。在这里也可以随意提出替代解决方案策略,但我想了解为什么上述解决方案不起作用。

4

2 回答 2

1

您的Scanner实例在其方法中遇到异常,该异常readInput将其sourceClosed字段设置为true并阻止您阅读。如果您对实际发生的位置感兴趣:

private void readInput() {
    ...

    int n = 0;
    try {
        n = source.read(buf);
    } catch (IOException ioe) {
        lastException = ioe;
        n = -1;
    }

    if (n == -1) {
        sourceClosed = true;
        needInput = false;
    }

    ...
}

这种行为没有,您需要修复底层异常。这里的问题是java.io.IOException: Write end dead. 有很多答案和博客文章可以帮助您比我更好地解决这个问题。另请查看相关的“Read end dead”问题。查看:

于 2016-06-02T02:22:45.790 回答
0

所涉及的流对象不是线程安全的,因此在不同步的情况下从不同线程访问它们时的行为是不可预测的。我猜想在这种特殊情况下,行为的差异与实际刷新到主内存的内容有关,但这只是一个猜测。

要获得可预测的行为,您需要在线程之间进行适当的同步。

于 2016-06-02T01:42:21.700 回答