我有两种类型的顺序数据流。并且,每种类型的数据都有一个订阅(监听)的观察者。
Observer observer1 = new Observer() {
@Override
public void next(Data data) {
//Do something
}
}
Observer observer2 = new Observer() {
@Override
public void next(Data data) {
//Do something
}
}
feed.subscribe(observer1, "Type1");
feed.subscribe(observer2, "Type2");
Type1
当接收到的一条数据时 ,会调用的next()
方法和 也是一样的。observer1
Type2
observer2
而且由于数据是顺序的,当我们有 and 的数据时type1
,type2
第二个应该等待第一个的进程(将被阻塞)。
现在我想用多线程来解决这个问题。但是,我不想为收到的每条数据创建一个线程。我希望观察者并行运行。
例如,如果我有这个序列:
type1, type1, type1, type2, ...
第二个和第三个数据应该等待第一个处理(因为它们是type1
),但第四个应该被收集observer2
和处理。
请注意,我使用的 API 只允许我订阅,它会处理这些next()
方法。
我想到的解决方案是在线程和观察者上进行映射。因此,在每次调用方法时,我可以调用分配给该类型的线程来恢复next()
,而不是创建一个新的。Thread
或者,如果有一个线程运行observer1
,我希望所有数据都type1
留在队列中等待该线程处理它们。
在Java中解决这个问题的任何设计模式或数据结构?