4

我正在使用 OSGI 框架来制作一个严重依赖数据包处理的应用程序。

每个包处理一个包,然后将其发送到下一个包。我想要的是每个捆绑包都可以并行工作。所以我想让每个包在它自己的线程或多个线程中运行。“问题”是 OSGI 并不真正支持多线程。在同一个 JVM 上运行的每个包只在 1 个线程中运行,因此它遵循同步模型。

我的想法:所以应用程序的性质有点像生产者消费者。Bundle A 提供了一个服务,它带有一个用于向 A 发送包的方法,我们称之为方法 ain。Bundle B 也有类似的设置,C 也有。它们都有 aa/b/cout 方法,在这个方法中它们使用下一个 bundle 的服务,所以在 A.aout 中你可以像这样调用 bin:bservice.bin(package )。

所以每个包都是数据包的消费者和生产者,这让我认为使用 ExecutorService 和 BlockingQueues 可能有效,但我不太确定如何在包之间“正确”实现这一点,并且它们都是消费者和生产者 我不太确定这是否是解决这个问题的最佳方法。

我希望你们能提供帮助和/或有任何建议。

- 编辑 -

捆绑一个 AServiceImplementation

public class AServiceImplementation implements AService {

    private BService bservice;

    public void bindBService(BService service) {
        bservice = service;
        System.out.println("bundle gateway bound to b service");
    }

    public void unbindBService(BService service) {
        this.bservice = null;
    }

    public void process(int id) {
        bservice.transmitIn(id);        
    }
}

Bundle B BService 实现

public class BServiceImplementation implements BService {

    private CService cservice;

    public void bindCService(CService service) {
        cservice = service;
        System.out.println("bundle gateway bound to b service");;
    }

    public void unbindCService(CService service) {
        this.cservice = null;
    }

    public void transmitIn(int id){
        // So if I would implement it THIS is where I would assign the data to 
        // a thread to get it processed in the process method.
        // but doesn't that make THIS method, transmitIn a bottleneck since all packages
        // need to pass through here?
        process(id);
    }

    public void process(int id) {
        // Lots of processing happens here  
    }
}

我真的不明白如何做到这一点,例如捆绑A通过transmitIn方法将数据传输到捆绑B而不会成为瓶颈,因为我会在该方法中将我的“工作分配”到不同的线程(如在上面的代码)

4

3 回答 3

9

Ahum, a full misunderstanding. OSGi could not care less what you do with concurrency, one of its great advantages is that it does not change the computing model like so many app servers do. From a bundle or package point of view, threads are utterly irrelevant. So any Java solution works here.

In your example all processing would take place on the initial callers thread, the one that calls AServiceImplementation. If you want to have processing in parallel you can use an executor for every task you submit, this is the pattern where Executors come in handy. This makes the processing async so you have no return value. Watch out that processing will be limited by the # of available threads in the Executor you provide. You also have to write your processing code very careful to handle proper synchronization for non-local variables.

Another method is to queue. Each service call creates an object with the task's information and this object is then posted into a blocking queue. In an OSGi activator you create a Thread that reads the queue and processes it. In this model you can process without worrying (too much) about concurrency since all the processing always happens in the same thread. For performance reasons you could start multiple threads, but processing is than a bit harder.

However, my advice is to skip this optimization and built your system as simple as possible first. If it runs, and you find you have performance problems, it is early enough to start worrying about these issues. With your current level of understanding I am afraid that you stand the chance to create a highly complex solution to a 'possible' problem.

于 2013-07-23T10:30:22.350 回答
4

“问题”是 OSGI 并不真正支持多线程。在同一个 JVM 上运行的每个包只在 1 个线程中运行,因此它遵循同步模型。

对不起,但这完全是个误会。让我引用 OSGi 核心规范,第 4.2.7 节:

OSGi 框架在多线程环境中运行。框架启动后,会启动bundles,这些bundles会被激活。激活的包通常启动后台线程或对来自其他包的事件作出反应。也就是说,在 start 方法返回后,框架已经进入 ACTIVE 状态,许多包可以在不同的线程上忙碌。

换句话说,您可以自由创建任何适合您需求的线程解决方案。这不是 OSGi 问题。

更新:

您的服务实现可以像这样共享一个 ExecutorService:

public class BServiceImplementation implements BService {

private ExecutorService executorService;

private CService cservice;

public void bindCService(CService service) {
    cservice = service;
    System.out.println("bundle gateway bound to b service");
}

public void bindExecutorService(ExecutorService executorService) {
    this.executorService = executorService;
}

public void unbindCService(CService service) {
    this.cservice = null;
}

public void transmitIn(final int id) {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            process(id);
        }
    });
}

public void process(int id) {
    // Lots of processing happens here
}
}

然后让另一个包将 ThreadPoolExecutor 公开为 ExecutorService。

于 2013-07-23T09:19:11.757 回答
1

我想知道您是否不能在激活方法中使用以下代码行:

public void activate(){
new Thread(this).start();
}

现在你只需要让你的bundles Runnable,并实现run() 方法。在此运行方法中,您可以让捆绑包的消费者端等待新任务。

然后让每个消费者(即通信包的服务端)定义一个 BlockingQueue。当捆绑包(生产者端)绑定到某个服务时,您还要确保持有对该队列的引用。这样,每个捆绑包应该只有一个队列。

于 2013-07-23T08:37:08.683 回答