6

我最近开始使用 Java 中的多线程

我有一个问题要解决一个问题,我只有 5 个线程,范围从T1, T2,...T5

任务是按以下顺序打印从 1 到 10 的数字。

T1 -> 1
T2 -> 2
T3 -> 3
T4 -> 4
T5 -> 5
T1 -> 6
T2 -> 7
T3 -> 8
T4 -> 9
T5 -> 10

我尝试用这段代码解决它。

public static void main(String[] args) throws InterruptedException {
    Counter counter = new Counter();
    Thread[] tArray = new Thread[] { new Thread(counter, "T1"), new Thread(counter, "T2"),
            new Thread(counter, "T3"), new Thread(counter, "T4"), new Thread(counter, "T5") };
    for (int i = 0; i < 10; i++) {
        if (i < 5) {
            tArray[i].start();
            tArray[i].join();
        } else {
            tArray[i - 5] = new Thread(counter, "T" + ((i - 5) + 1)); //Instantiating new Thread which is not allowed.
            tArray[i - 5].start();
            tArray[i - 5].join();
        }
    }
}

public class Counter implements Runnable {

    private int count = 0;

    @Override
    public synchronized void run() {
       System.out.println(Thread.currentThread().getName() + " -> " + ++count);
    }

}

但由于只允许 5 个线程,因此不接受我的解决方案,因为我也在循环块中进行实例new Thread化。elsefor

任何帮助将不胜感激。

4

5 回答 5

5

您需要安排线程之间的交互。线程交互最自然的方式是设置连接线程的阻塞队列。队列可以是独立的对象,也可以属于特定的线程。在您的情况下,您需要制作一个由 5 个线程组成的圆圈。

class CountPrinter extends Thread {
   String name;
   ArrayBlockingQueue<Integer> inp = new ArrayBlockingQueue<>();
   CountPrinter next;

   public void run() {
      for (;;)
         int n = inp.take();
         if (n == 11) {// only 10 numbers must be printed
            next.inp.put(11);
            return;
         }
         System.out.println(name+"->"+n);
         next.inp.put(n+1);
      }
   }
}

在创建线程之后和开始之前,您需要分配字段namenext. 我相信你可以自己编程。此外,必须为第一个线程提供初始值1

于 2018-09-27T14:59:00.467 回答
1

免责声明:我正在回答 OP 问题的实际对应物——串行输入和输出的并行处理。它更有趣。

思考过程

  1. 我有一个串行资源 - System.out. 无论我如何构建代码,前面都会有显式或隐式的排队/争用。
  2. 处理争用的最佳方法是通过显式队列(可以观察、量化和寻址,与使用互斥锁或同步块上的隐式队列相反)。
  3. My 是一个 3 步管道:ProduceStringizeOutput
  4. Stringize步骤可以并行完成,前提是排序Output仍然可以发生。
  5. 我从一个快速而肮脏的“穷人”解决方案开始。Java 8这将与CompletableFuture-s 一起使用:

    final Executor inputWorker = newSingleThreadExecutor();
    final Executor processingPool = newFixedThreadPool(3);
    final Executor outputWorker = newSingleThreadExecutor();
    
    final int[] counter = {-1}; // this emulates a non-thread-safe information source
    CompletableFuture<Void> future = completedFuture(null);
    for (int i = 0; i < 10; ++i) {
        future = future // chaining of futures is essential for serializing subsequent iterations
                .thenApplyAsync(unused -> ++counter[0], inputWorker)
                .thenApplyAsync(Objects::toString, processingPool)
                .thenAcceptAsync(System.out::println, outputWorker);
    }
    future.join();
    
  6. 一旦我对它的工作原理有了很好的直觉,我可能会考虑使用诸如actordisruptor之类的工业技术来进一步改进它。

PS - 为了完整起见,可能希望步骤 #5 略有不同,首先创建整个计算计划,然后触发它:

final Executor producer = newSingleThreadExecutor();
final Executor stringizer = newFixedThreadPool(3);
final Executor printer = newSingleThreadExecutor();

final int[] counter = {-1}; // this emulates a non-thread-safe information source

System.out.println("creating schedule...");
// first schedule the whole amount of work and block the execution on a single "trigger" future
final CompletableFuture<Void> trigger = new CompletableFuture<>();
CompletableFuture<Void> future = trigger;
for (int i = 0; i < 10; ++i) {
    future = future
            .thenApplyAsync(unused -> ++counter[0], producer)
            .thenApplyAsync(Objects::toString, stringizer)
            .thenAcceptAsync(System.out::println, printer);
}

// then pull the trigger
System.out.println("pulling the trigger...");
trigger.complete(null);
future.join();
于 2018-09-27T11:46:30.747 回答
0
public static void main(String... args) {
    class LocalTask extends Thread {

        public static final int TOTAL = 5;

        private final int a;
        private final int b;
        private final AtomicInteger lock;

        public LocalTask(int id, AtomicInteger lock) {
            super("T" + id);
            a = id;
            b = id + TOTAL;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (lock) {
                        int count = lock.get();
                        String name = Thread.currentThread().getName();

                        if (count == a)
                            System.out.println(name + " -> " + a);
                        else if (count == b)
                            System.out.println(name + " -> " + b);
                        else
                            continue;

                        lock.incrementAndGet();
                        lock.notifyAll();

                        if (count == a)
                            lock.wait();
                        else
                            return;
                    }
                }
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    AtomicInteger lock = new AtomicInteger(1);

    for (int i = 1; i <= LocalTask.TOTAL; i++)
        new LocalTask(i, lock).start();
}
于 2018-09-27T17:05:30.520 回答
0

另一种方法是保留两个AtomicIntegers ,如下所示:

static class MyRunnable implements Runnable {

    private final AtomicInteger index;
    private final AtomicInteger ai;
    private final int[] array;
    private final int current;
    private final int next;

    public MyRunnable(AtomicInteger index, AtomicInteger ai, int[] array, int current, int next) {
        this.index = index;
        this.ai = ai;
        this.array = array;
        this.current = current;
        this.next = next;
    }

    @Override
    public void run() {
        for (;;) {
            if (index.get() == array.length) {
                break;
            }
            if (ai.get() == current) {
                System.out.println(Thread.currentThread().getName() + " " + array[index.getAndIncrement()]);
                ai.compareAndSet(current, next);
            }
        }
    }
}

用法是:

public static void main(String[] args) {

    int[] array = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

    AtomicInteger ai = new AtomicInteger(1);
    AtomicInteger index = new AtomicInteger(0);

    Thread t1 = new Thread(new MyRunnable(index, ai, array, 1, 2), "T1");
    Thread t2 = new Thread(new MyRunnable(index, ai, array, 2, 3), "T2");
    Thread t3 = new Thread(new MyRunnable(index, ai, array, 3, 4), "T3");
    Thread t4 = new Thread(new MyRunnable(index, ai, array, 4, 5), "T4");
    Thread t5 = new Thread(new MyRunnable(index, ai, array, 5, 1), "T5");

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();

}
于 2018-09-27T13:31:52.157 回答
0

您需要允许您的 5 个线程进行通信,以便它们以严格的顺序运行。有点像多米诺骨牌倒下,每个线程必须等待直到它被戳,然后它会做它的事情并对下一个线程说'你现在可以走了'。

但与倒下的多米诺骨牌不同,他们还必须重新站起来,以便为下一轮做好准备,最后一张多米诺骨牌必须再次击倒第一张多米诺骨牌!

我认为这种通信的最佳内置是Phaser. 这是使用 Phasers 的实现:

import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;

public final class SequencedThreads
{
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        PrintThread[] threads = new PrintThread[5];

        // Create our 5 threads, each with a phaser waiting on itself 
        // and the previous thread
        for(int i = 0; i < 5; i++) {
            threads[i] = new PrintThread("T" + (i + 1));
            if(i > 0)
                threads[i - 1].next = threads[i].phaser;
        }

        // Join the last thread back to the first thread
        threads[4].next = threads[0].phaser;

        // Start our threads
        for(PrintThread pt : threads)
            pt.start();

        // Trigger the first thread to print
        threads[0].phaser.arriveAndDeregister();
    }

    private static final class PrintThread extends Thread {
        Phaser phaser;
        Phaser next;

        public PrintThread(String name) {
            super(name);
            this.phaser = new Phaser(2);
        }

        @Override
        public void run() {
            while(true) {
                // Block until we are poked
                phaser.arriveAndAwaitAdvance();
                int newCount = count.incrementAndGet();
                if(newCount > 10) {
                    // We are done, but trigger the other threads to allow 
                    // the JVM to exit
                    next.arriveAndDeregister();
                    return;
                }

                System.out.println(getName() + " -> " + newCount);
                // Pick ourselves back up
                phaser.register();
                // Poke the next domino
                next.arriveAndDeregister();
            }
        }
    }
}
于 2018-09-27T15:04:23.040 回答