7

我正在编写一个模仿电影院的多线程应用程序。每个参与的人都是自己的线程,并发必须完全由信号量完成。我遇到的唯一问题是如何基本上链接线程以便它们可以通信(例如通过管道)。

例如:

Customer[1] 是一个线程,它获取一个信号量,让它走到票房。现在客户[1] 必须告诉票房代理他们想看电影“X”。然后 BoxOfficeAgent[1] 也是一个线程,必须检查以确保电影未满,然后卖票或告诉 Customer[1] 选择另一部电影。

如何在保持信号量并发的同时来回传递数据?

此外,我可以从 java.util.concurrent 中使用的唯一类是Semaphore类。

4

2 回答 2

8

在线程之间来回传递数据的一种简单方法是使用BlockingQueue<E>位于包中的接口实现java.util.concurrent

此接口具有将元素添加到具有不同行为的集合的方法:

  • add(E): 如果可能,添加,否则抛出异常
  • boolean offer(E): 如果元素已被添加,则返回 true,否则返回 false
  • boolean offer(E, long, TimeUnit): 尝试添加元素,等待指定的时间
  • put(E): 阻塞调用线程,直到元素被添加

它还定义了具有类似行为的元素检索方法:

  • take(): 阻塞直到有可用的元素
  • poll(long, TimeUnit): 检索一个元素或返回 null

我最常使用的实现是ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue

第一个,ArrayBlockingQueue,具有固定大小,由传递给其构造函数的参数定义。

第二个,LinkedBlockingQueue,大小不限。它将始终接受任何元素,即offer立即返回 true,add永远不会抛出异常。

第三个,对我来说最有趣的一个SynchronousQueue,就是一个管道。你可以把它想象成一个大小为 0 的队列。它永远不会保留一个元素:如果有其他线程试图从中检索元素,这个队列只会接受元素。相反,如果有另一个线程试图推送它,检索操作只会返回一个元素。

为了完成专门使用信号量完成的同步作业要求,您可以从我给您的有关 SynchronousQueue 的描述中获得启发,并编写一些非常相似的内容:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

请注意,此类呈现出与我描述的 SynchronousQueue 类似的行为。

一旦方法put(E)被调用,它就会获取写入信号量,该信号量将保留为空,以便对同一方法的另一个调用将在其第一行阻塞。然后,此方法存储对正在传递的对象的引用,并释放读取的信号量。此版本将使调用该take()方法的任何线程都可以继续进行。

然后,该方法的第一步take()自然是获取读取信号量,以禁止任何其他线程同时检索元素。在元素被检索并保存在局部变量中后(练习:如果删除该行 E e = this.e 会发生什么?),该方法释放写入信号量,以便put(E)可以通过以下方式再次调用该方法任何线程,并返回已保存在局部变量中的内容。

作为重要说明,请注意对正在传递的对象的引用保存在私有字段中,并且方法take()put(E)都是final。这是最重要的,而且经常被忽略。如果这些方法不是最终的(或者更糟糕的是,该字段不是私有的),那么继承类将能够改变契约的行为take()put(E)破坏契约。

最后,您可以避免在方法中声明局部变量,take()方法try {} finally {}如下:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

在这里,如果只是为了展示这个例子的用途,try/finally那么没有经验的开发人员不会注意到这一点。显然,在这种情况下,没有真正的收获。

哦,该死的,我已经为你完成了你的作业。在报应方面——为了测试你对信号量的了解——为什么不实现 BlockingQueue 合约定义的其他一些方法呢?例如,您可以实现一个offer(E)方法和一个take(E, long, TimeUnit)!

祝你好运。

于 2011-04-09T04:40:30.703 回答
1

考虑一下带有读/写锁的共享内存。

  1. 创建一个缓冲区来放置消息。
  2. 应该使用锁/信号量来控制对缓冲区的访问。
  3. 将此缓冲区用于线程间通信目的。

问候

PKV

于 2011-04-09T06:43:29.660 回答