3

I'm trying to understand thread basics, and as a first example I create two thread that write a String on the stdout. As I know the scheduler allows to execute the threads using a round robin schedule. Thats why I got:

PING PING pong pong pong PING PING PING pong pong

Now I want to use a shared variable, so every thread will know if its your turn:

public class PingPongThread extends Thread {
private String msg;
private static String turn;

public PingPongThread(String msg){
    this.msg = msg;
}
@Override
public void run() {
    while(true) {
        playTurn();
    }

}
public synchronized void playTurn(){
    if (!msg.equals(turn)){
        turn=msg;
        System.out.println(msg);
    }
}
}

Main class:

public class ThreadTest {
    public static void main(String[] args) {
        PingPongThread thread1 = new PingPongThread("PING");
        PingPongThread thread2 = new PingPongThread("pong");
        thread1.start();
        thread2.start();
    }
}

I synchronized the "turn manager" but I still get something like:

PING PING pong pong pong PING PING PING pong pong

Can someone explains what I am missing, and Why I'm not getting Ping pong... ping pong. Thanks!

4

8 回答 8

13

在与 Brian Agnew 的讨论结束时,我提交了这段代码,用于java.util.concurrent.Phaser协调你的乒乓球线程:

static final Phaser p = new Phaser(1);
public static void main(String[] args) {
  t("ping");
  t("pong");
}
private static void t(final String msg) {
  new Thread() { public void run() {
    while (true) {
      System.out.println(msg);
      p.awaitAdvance(p.arrive()+1);
    }
  }}.start();
}

此解决方案与您尝试编写的解决方案之间的主要区别在于您的解决方案忙于检查标志,从而浪费 CPU 时间(和能源!)。正确的方法是使用阻塞方法使线程进入睡眠状态,直到它收到相关事件的通知。

于 2012-10-08T18:07:16.850 回答
12

这一行:

public synchronized void playTurn(){
    //code
}

在行为上等同于

public void playTurn() {
    synchronized(this) {
         //code
    }
}

这就是为什么没有发生同步的原因,因为正如 Brian Agnew 指出的那样,线程正在两个不同的对象(thread1、thread2)上同步,每个对象都在它自己的实例上导致没有有效的同步。

如果您将使用轮变量进行同步,例如:

private static String turn = ""; // must initialize or you ll get an NPE

public void playTurn() {
    synchronized(turn) {
         //...
         turn = msg; // (1)
         //...
    }
}

那么情况会好很多(运行多次验证),但也没有 100% 同步。在开始时(大多数情况下)你会得到双乒乓球和双乒乓球,之后它们看起来是同步的,但你仍然可以获得双乒乓球/乒乓球。

同步块锁定(参见这个很好的答案)而不是对该值的引用。(见编辑)

因此,让我们看一下一种可能的情况:

thread1 locks on ""
thread2 blocks on ""
thread1 changes the value of turn variable to "PING" - thread2 can continue since "" is no longer locked 

为了验证我是否尝试过

try {
    Thread.currentThread().sleep(1000); // try with 10, 100 also multiple times
 } 
 catch (InterruptedException ex) {}

之前和之后

turn = msg;

它看起来是同步的?!但是,如果你把

 try {
    Thread.yield();
    Thread.currentThread().sleep(1000); //  also  try multiple times
 } 
 catch (InterruptedException ex) {}

几秒钟后,您会看到双乒乓球/乒乓球。Thread.yield()本质上意味着“我已经完成了处理器,让其他线程工作”。这显然是我的操作系统上的系统线程调度程序实现。

因此,要正确同步,我们必须删除行

    turn = msg;

这样线程可以始终在相同的值上同步 - 不是真的:) 正如上面给出的好答案中所解释的- 字符串(不可变对象)作为锁是危险的 - 因为如果你在程序中的 100 个位置创建字符串“A”,所有 100 references(variables) 将指向内存中相同的“A”-因此您可能会过度同步

所以,要回答你原来的问题,修改你的代码是这样的:

 public void playTurn() {
    synchronized(PingPongThread.class) {
         //code
    }
}

并且并行 PingPong 示例将 100% 正确实施(参见 EDIT^2)。

上面的代码等价于:

 public static synchronized void playTurn() {
     //code
 }

PingPongThread.class 是一个Class 对象,例如,在每个实例上,您都可以调用getClass(),它总是只有一个实例。

你也可以这样做

 public static Object lock = new Object();

 public void playTurn() {
    synchronized(lock) {
         //code
    }
}

此外,阅读和编​​程示例(必要时多次运行)本教程

编辑:

要在技术上正确

同步方法与在此上锁定的同步语句相同。让我们将同步语句的参数称为“lock”——正如 Marko 所指出的,“lock”是一个变量,用于存储对类的对象/实例的引用。引用规范:

同步语句计算对对象的引用;然后它尝试在该对象的监视器上执行锁定操作。

因此,同步并不是基于——对象/类实例,而是基于与该实例/值关联的对象监视器。因为

Java 中的每个对象都与一个监视器相关联。

效果保持不变。

编辑^2:

跟进评论评论:“并行 PingPong 示例将 100% 正确实现” - 意思是,实现了所需的行为(没有错误)。

恕我直言,如果结果正确,则解决方案是正确的。有很多解决问题的方法,所以下一个标准是解决方案的简单/优雅 - 移相器解决方案是更好的方法,因为正如马尔科在一些评论中所说的那样,使用移相器出错的可能性要小得多对象而不是使用同步机制 - 这可以从这篇文章中的所有(非)解决方案变体中看出。值得注意的是代码大小和整体清晰度的比较。

总而言之,只要这种结构适用于所讨论的问题,就应该使用它们。

于 2012-10-08T17:51:09.870 回答
4

的每个实例都在其自身而不是共享资源上PingPongThread同步。为了控制消息传递,您需要在共享资源上同步(例如您的变量?)turn

但是我不认为这真的会奏效。我认为您应该检查wait()notify()执行此操作(如果您想了解线程原语)。请参阅示例。

于 2012-10-08T16:04:22.407 回答
0

我的解决方案是这样的:

public class InfinitePingPong extends Thread  {

    private static final Object lock= new Object();

private String toPrintOut;

    public InfinitePingPong(String s){
        this.toPrintOut = s;
    }


    public void run(){
        while (true){
            synchronized(lock){
                System.out.println(this.toPrintOut +" -->"+this.getId()); 
                lock.notifyAll();

                try {
                    lock.wait();
                } catch (InterruptedException e) {}
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {


        InfinitePingPong a = new InfinitePingPong("ping");
        InfinitePingPong b = new InfinitePingPong("pong");


        a.start();
        b.start();

        b.wait();

        try {
            a.join();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }








}
}
于 2014-10-21T08:17:22.417 回答
0

这是一个用 Java 编写的乒乓球程序。Ping 和 Pong 是独立的线程。每个线程既是消费者又是生产者。当每个线程运行时,它会做两件事

  1. 产生允许对方(作为消费者)运行的消息
  2. 使用导致自身挂起的消息。

该代码基于 Oracles ProducerConsumerExample。请注意,Ping 和 Pong 类的代码和行为几乎相同。OP 代码中的线程仅使用对象监视器的“互斥”部分(正如上面 Brian Agnew 所建议的那样)。他们从不调用等待。因此它们只相互排斥,但从不调用 java 运行时来允许另一个线程运行。

/*
 * Copyright (c) 1995, 2008, Oracle and/or its affiliates. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *   - Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *   - Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 *   - Neither the name of Oracle or the names of its
 *     contributors may be used to endorse or promote products derived
 *     from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

 * based on oracle example on sync-wait-notify
 * cf. https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html
 * run with java ProducerConsumerExample
 * 
 *
 */ 

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Drop drop = new Drop();
    DropCtoP dropCtoP = new DropCtoP();
    (new Thread(new Ping(drop,dropCtoP))).start();
        (new Thread(new Pong(drop,dropCtoP))).start();
    }
}


public class Pong implements Runnable {
    private Drop drop;
    private DropCtoP dropCtoP;
    private int count=0;

    public Pong(Drop drop,DropCtoP dropCtoP) {
        this.drop = drop;
        this.dropCtoP = dropCtoP;
    }

    public void run() {
        String message;
        for (;;) {
        count++;
            message = drop.take();
            System.out.format("Pong running - : %s - ran num times %d %n", message,count);
            dropCtoP.put("Run ping token");
        }
    }
}



public class Ping implements Runnable {
    private Drop drop;
    private DropCtoP dropCtoP;
    private int count=0;

    public Ping(Drop drop,DropCtoP dropCtoP) {
        this.drop = drop;
        this.dropCtoP = dropCtoP;
    }

    public void run() {

        String message;
        for (;;) {
      count++;
      drop.put("Run pong token");
      message = dropCtoP.take();
      System.out.format("PING running - : %s- ran num times %d %n", message,count);
        }

    }
}



public class DropCtoP {
    // Message sent from producer
    // to consumer.
    private String message;
    // True if consumer should wait
    // for producer to send message,
    // false if producer should wait for
    // consumer to retrieve message.
    private boolean empty2 = true;


    public synchronized String take() {
        // Wait until message is
        // available.
        while (empty2) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty2 = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait until message has
        // been retrieved.
        while (!empty2) {
            try { 
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty2 = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }    
}


public class Drop {
    // Message sent from producer
    // to consumer.
    private String message;
    // True if consumer should wait
    // for producer to send message,
    // false if producer should wait for
    // consumer to retrieve message.
    private boolean empty = true;

    public synchronized String take() {
        // Wait until message is
        // available.
        while (empty) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait until message has
        // been retrieved.
        while (!empty) {
            try { 
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }


}
于 2019-01-15T14:32:39.990 回答
0

一种选择是使用 SynchronousQueue 。

import java.util.concurrent.SynchronousQueue;

public class PingPongPattern {

    private SynchronousQueue<Integer> q = new SynchronousQueue<Integer>();
    private Thread t1 = new Thread() {

        @Override
        public void run() {
            while (true) {

                // TODO Auto-generated method stub
                super.run();
                try {

                    System.out.println("Ping");
                    q.put(1);
                    q.put(2);
                } catch (Exception e) {

                }
            }
        }

    };

    private Thread t2 = new Thread() {

        @Override
        public void run() {

            while (true) {
                // TODO Auto-generated method stub
                super.run();
                try {
                    q.take();
                    System.out.println("Pong");
                    q.take();

                } catch (Exception e) {

                }

            }

        }

    };

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        PingPongPattern p = new PingPongPattern();
        p.t1.start();
        p.t2.start();
    }
}
于 2017-04-04T15:24:01.970 回答
0

可能的实现之一:

public class PingPongDemo {

    private static final int THREADS = 2;

    private static int nextIndex = 0;

    private static String getMessage(int index) {
        return index % 2 == 0 ? "ping" : "pong";
    }

    public static void main(String[] args) throws Throwable {
        var lock = new ReentrantLock();

        var conditions = new Condition[THREADS];
        for (int i = 0; i < conditions.length; i++) {
            conditions[i] = lock.newCondition();
        }

        for (int i = 0; i < THREADS; i++) {
            var index = i;

            new Thread(() -> {
                lock.lock();
                try {
                    while (true) {
                        System.out.println(getMessage(index));
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                        nextIndex = (nextIndex + 1) % THREADS;

                        conditions[nextIndex].signal();

                        while (nextIndex != index) {
                            conditions[index].awaitUninterruptibly();
                        }
                    }
                } finally {
                    lock.unlock();
                }
            }).start();

            if (index < THREADS - 1) {
                lock.lock();
                try {
                    while (nextIndex != (index + 1)) {
                        conditions[index + 1].awaitUninterruptibly();
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

在这里,我们有效地进行循环输出。

于 2019-11-18T10:14:57.740 回答
0

这是一个使用Semaphore对象来完成同步的版本:

import java.util.concurrent.*;

public class Main {
    @FunctionalInterface
    public interface QuadFunction<T, U, V, W, R> {
        public R apply(T t, U u, V v, W w);
    }

    public static void main(String[] args) {
        ExecutorService svc = Executors.newFixedThreadPool(2);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Terminating...");
            svc.shutdownNow();
            try { svc.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); }
            catch(InterruptedException e) {};
        }));

        var sem1 = new Semaphore(1);
        var sem2 = new Semaphore(0);

        QuadFunction<String, String, Semaphore, Semaphore, Runnable> fun =
            (name, action, s1, s2) ->
                (Runnable) () -> {
                    try {
                        while (true) {
                            s1.acquire();
                            System.out.format("%s %s\n", name, action);
                            Thread.sleep(500);
                            s2.release(1);
                        }
                    } catch (InterruptedException e) {}
                    s2.release(1);
                    System.out.format("==> %s shutdown\n", name);
                };

        svc.execute(fun.apply("T1", "ping", sem1, sem2));
        svc.execute(fun.apply("T2", "pong", sem2, sem1));
    }
}
于 2020-05-15T13:28:57.910 回答