0

我正在开发的一个应用程序大量使用了异步处理,我正在寻找一种更好的方式来组织代码。

在 servlet 上接收系统的外部输入。此 servlet 收集的原始数据存放在队列中。线程池针对该队列运行,并将原始数据解析为结构化记录,然后将其存入一组 N 个队列中的一个。选择队列使得所有同类记录都进入同一个队列。这 N 个队列每个都由一个线程服务,该线程将相同类型的记录收集到一个集合中。每分钟一个计划任务唤醒并将前一分钟收集的每种类型的所有记录写入文件。

目前,此代码使用一堆队列、线程池和不断运行的可运行文件来组织,这使得逻辑难以遵循。我想将此代码重构为上述数据流更明确的内容。我正在寻找实现这一目标的工具和方法。

  • RxJava 之类的工具对此有帮助吗?如果是这样,怎么做?
  • 我应该考虑其他方法吗?
4

1 回答 1

2

这是根据您的描述的 RxJava 示例。希望它会帮助你。

public class TestServlet extends HttpServlet {

private static final PublishSubject<String> o = PublishSubject.<String>create();

public static Observable<String> getServletObservable() {
    return o.observeOn(Schedulers.computation());
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {
    synchronized (TestServlet.class) {
        o.onNext("value");
    }
}

}

class Foo {
public static void main(String[] args) {
    TestServlet.getServletObservable().map(new Func1<String, String>() {

        @Override
        public String call(String t1) {
            // do something
            return null;
        }

    }).groupBy(new Func1<String, String>() {

        @Override
        public String call(String t1) {
            // do something
            return null;
        }

    }).subscribe(new Observer<GroupedObservable<String, String>>() {

        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(GroupedObservable<String, String> group) {
            group.observeOn(Schedulers.io()).subscribe(new Observer<String>() {

                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String t) {
                    // store t
                }

            });
        }

    });
}
}
于 2014-03-18T02:47:37.420 回答