2

我在 Android 上,我创建了一个Observable触地得分。只要新事件在一秒钟内到来,我就想缓冲它们。从最后一次着陆发射经过一秒钟后,我想创建一个所有收集到的事件的列表并发出它,然后开始一个新的收集。

我有 2 个可以工作的代码片段,但它们要么看起来不太被动(#1),要么看起来不太复杂(#2)。他们来了:

  1. 我使用buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector)重载 - 当返回的选择器发出一个项目时,这意味着从最后一个源发射经过一秒钟,可以发射缓冲区。
  2. 选择器Obserbable函数返回的是一个PublishSubject,以便我可以决定何时将排放推送到它。
  3. 有一项Runnable任务将新的排放量推向结束主题。此任务被安排(通过 Android 处理程序)在当前处理的源着陆发射发生后一秒钟运行。当随后在一秒钟内发生新着陆的新源发射时,该任务被取消,并在一秒钟后再次安排新的发射。

这是相关的Android代码:

    final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();

    final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
    viewGroup.setOnTouchListener(new View.OnTouchListener() {

        @Override
        public boolean onTouch(View v, MotionEvent event) {
            touchPublishSubject.onNext(event);

            return true;
        }
    });

    final PublishSubject<Object> windowClosePublishSubject = PublishSubject.create();
    final Handler handler = new Handler(Looper.getMainLooper());
    final Runnable r = new Runnable() {

        @Override
        public void run() {
            Log.d(TAG, "will emit closing item");
            windowClosePublishSubject.onNext("now!");
        }
    };

    ViewObservable
            .bindView(viewGroup, touchPublishSubject)
            .filter(new Func1<MotionEvent, Boolean>() {

                @Override
                public Boolean call(MotionEvent motionEvent) {
                    return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
                }
            })
            .doOnNext(new Action1<MotionEvent>() {

                @Override
                public void call(MotionEvent motionEvent) {
                    // restart the timer
                    Log.d(TAG, "cancelling closing");
                    handler.removeCallbacks(r);
                    Log.d(TAG, "scheduling closing");
                    handler.postDelayed(r, 1000L);

                    // show the touch
                    Log.d(TAG, motionEvent.toString());
                }
            })
            .buffer(new Func0<Observable<?>>() {

                @Override
                public Observable<?> call() {
                    Log.d(TAG, "creating buffer closing selector");
                    return windowClosePublishSubject
                            .doOnNext(new Action1<Object>() {

                                @Override
                                public void call(Object o) {
                                    Log.d(TAG, "emitting closing item '" + o + "'");
                                }
                            });
                }
            })
            .subscribe(new Action1<List<MotionEvent>>() {

                @Override
                public void call(List<MotionEvent> motionEvents) {
                    // show number of touch downs
                    Log.d(TAG, "got " + motionEvents.size() + " touch downs");
                }
            });

我不喜欢Handler在这个解决方案中使用和所有这些,所以我进一步研究了。

第二个片段(touchPublishSubject 和 touch listener 完全相同):

  1. 我重用着touchPublishSubject陆生成作为窗口关闭可观察到,首先用 1 秒超时对其进行去抖动
  2. 显然,随着去抖动发生在 上Scheduler.computation(),它移动观察到同一个调度程序,我需要使用observeOn(AndroidSchedulers.mainThread())- 我觉得有点奇怪的是嵌套Observable的调度程序,它只关闭缓冲区窗口,促进整个链在它的调度程序中发生以及

编码:

    final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();

    final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
    viewGroup.setOnTouchListener(new View.OnTouchListener() {

        @Override
        public boolean onTouch(View v, MotionEvent event) {
            touchPublishSubject.onNext(event);

            return true;
        }
    });

    ViewObservable
            .bindView(viewGroup, touchPublishSubject)
            .filter(new Func1<MotionEvent, Boolean>() {

                @Override
                public Boolean call(MotionEvent motionEvent) {
                    return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
                }
            })
            .doOnNext(new Action1<MotionEvent>() {

                @Override
                public void call(MotionEvent motionEvent) {
                    // show the touch
                    Log.d(TAG, motionEvent.toString());
                }
            })
            .buffer(new Func0<Observable<?>>() {

                @Override
                public Observable<?> call() {
                    Log.d(TAG, "creating buffer closing selector");
                    return touchPublishSubject
                            .debounce(1L, TimeUnit.SECONDS)
                            .doOnNext(new Action1<Object>() {

                                @Override
                                public void call(Object o) {
                                    Log.d(TAG, "emitting closing item '" + o + "'");
                                }
                            });
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<List<MotionEvent>>() {

                @Override
                public void call(List<MotionEvent> motionEvents) {
                    // show number of touch downs
                    Log.d(TAG, "got " + motionEvents.size() + " touch downs");
                }
            });

这段代码有效,我比第一个更喜欢它,感觉更像是应该用 Rx 来完成。但它很复杂,因为Observable要获得它需要嵌套和大脑体操。我是否buffer缺少自动执行相同操作的重载(即在最后一次发射时间为 1 秒后关闭其窗口)?

编辑:其中一条评论让我意识到 Ben Christensen 的演讲,然后我发现了这个:https ://blog.kaush.co/2015/01/05/debouncedbuffer-with-rxjava/ ,它链接到一些问题的实施。似乎是一个非常普遍的要求,如果有一个内置的运算符来做这个会很好。无论如何,我会考虑这些其他来源中提出的解决方案,这里是此类问题的规范。

4

0 回答 0