在线程之间来回传递数据的一种简单方法是使用BlockingQueue<E>
位于包中的接口实现java.util.concurrent
。
此接口具有将元素添加到具有不同行为的集合的方法:
add(E)
: 如果可能,添加,否则抛出异常
boolean offer(E)
: 如果元素已被添加,则返回 true,否则返回 false
boolean offer(E, long, TimeUnit)
: 尝试添加元素,等待指定的时间
put(E)
: 阻塞调用线程,直到元素被添加
它还定义了具有类似行为的元素检索方法:
take()
: 阻塞直到有可用的元素
poll(long, TimeUnit)
: 检索一个元素或返回 null
我最常使用的实现是ArrayBlockingQueue
:LinkedBlockingQueue
和SynchronousQueue
。
第一个,ArrayBlockingQueue
,具有固定大小,由传递给其构造函数的参数定义。
第二个,LinkedBlockingQueue
,大小不限。它将始终接受任何元素,即offer
立即返回 true,add
永远不会抛出异常。
第三个,对我来说最有趣的一个SynchronousQueue
,就是一个管道。你可以把它想象成一个大小为 0 的队列。它永远不会保留一个元素:如果有其他线程试图从中检索元素,这个队列只会接受元素。相反,如果有另一个线程试图推送它,检索操作只会返回一个元素。
为了完成专门使用信号量完成的同步作业要求,您可以从我给您的有关 SynchronousQueue 的描述中获得启发,并编写一些非常相似的内容:
class Pipe<E> {
private E e;
private final Semaphore read = new Semaphore(0);
private final Semaphore write = new Semaphore(1);
public final void put(final E e) {
write.acquire();
this.e = e;
read.release();
}
public final E take() {
read.acquire();
E e = this.e;
write.release();
return e;
}
}
请注意,此类呈现出与我描述的 SynchronousQueue 类似的行为。
一旦方法put(E)
被调用,它就会获取写入信号量,该信号量将保留为空,以便对同一方法的另一个调用将在其第一行阻塞。然后,此方法存储对正在传递的对象的引用,并释放读取的信号量。此版本将使调用该take()
方法的任何线程都可以继续进行。
然后,该方法的第一步take()
自然是获取读取信号量,以禁止任何其他线程同时检索元素。在元素被检索并保存在局部变量中后(练习:如果删除该行 E e = this.e 会发生什么?),该方法释放写入信号量,以便put(E)
可以通过以下方式再次调用该方法任何线程,并返回已保存在局部变量中的内容。
作为重要说明,请注意对正在传递的对象的引用保存在私有字段中,并且方法take()
和put(E)
都是final。这是最重要的,而且经常被忽略。如果这些方法不是最终的(或者更糟糕的是,该字段不是私有的),那么继承类将能够改变契约的行为take()
并put(E)
破坏契约。
最后,您可以避免在方法中声明局部变量,take()
方法try {} finally {}
如下:
class Pipe<E> {
// ...
public final E take() {
try {
read.acquire();
return e;
} finally {
write.release();
}
}
}
在这里,如果只是为了展示这个例子的用途,try/finally
那么没有经验的开发人员不会注意到这一点。显然,在这种情况下,没有真正的收获。
哦,该死的,我已经为你完成了你的作业。在报应方面——为了测试你对信号量的了解——为什么不实现 BlockingQueue 合约定义的其他一些方法呢?例如,您可以实现一个offer(E)
方法和一个take(E, long, TimeUnit)
!
祝你好运。