我正在使用 OSGI 框架来制作一个严重依赖数据包处理的应用程序。
每个包处理一个包,然后将其发送到下一个包。我想要的是每个捆绑包都可以并行工作。所以我想让每个包在它自己的线程或多个线程中运行。“问题”是 OSGI 并不真正支持多线程。在同一个 JVM 上运行的每个包只在 1 个线程中运行,因此它遵循同步模型。
我的想法:所以应用程序的性质有点像生产者消费者。Bundle A 提供了一个服务,它带有一个用于向 A 发送包的方法,我们称之为方法 ain。Bundle B 也有类似的设置,C 也有。它们都有 aa/b/cout 方法,在这个方法中它们使用下一个 bundle 的服务,所以在 A.aout 中你可以像这样调用 bin:bservice.bin(package )。
所以每个包都是数据包的消费者和生产者,这让我认为使用 ExecutorService 和 BlockingQueues 可能有效,但我不太确定如何在包之间“正确”实现这一点,并且它们都是消费者和生产者 我不太确定这是否是解决这个问题的最佳方法。
我希望你们能提供帮助和/或有任何建议。
- 编辑 -
捆绑一个 AServiceImplementation
public class AServiceImplementation implements AService {
private BService bservice;
public void bindBService(BService service) {
bservice = service;
System.out.println("bundle gateway bound to b service");
}
public void unbindBService(BService service) {
this.bservice = null;
}
public void process(int id) {
bservice.transmitIn(id);
}
}
Bundle B BService 实现
public class BServiceImplementation implements BService {
private CService cservice;
public void bindCService(CService service) {
cservice = service;
System.out.println("bundle gateway bound to b service");;
}
public void unbindCService(CService service) {
this.cservice = null;
}
public void transmitIn(int id){
// So if I would implement it THIS is where I would assign the data to
// a thread to get it processed in the process method.
// but doesn't that make THIS method, transmitIn a bottleneck since all packages
// need to pass through here?
process(id);
}
public void process(int id) {
// Lots of processing happens here
}
}
我真的不明白如何做到这一点,例如捆绑A通过transmitIn方法将数据传输到捆绑B而不会成为瓶颈,因为我会在该方法中将我的“工作分配”到不同的线程(如在上面的代码)