我在 Android 上,我创建了一个Observable
触地得分。只要新事件在一秒钟内到来,我就想缓冲它们。从最后一次着陆发射经过一秒钟后,我想创建一个所有收集到的事件的列表并发出它,然后开始一个新的收集。
我有 2 个可以工作的代码片段,但它们要么看起来不太被动(#1),要么看起来不太复杂(#2)。他们来了:
- 我使用
buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector)
重载 - 当返回的选择器发出一个项目时,这意味着从最后一个源发射经过一秒钟,可以发射缓冲区。 - 选择器
Obserbable
函数返回的是一个PublishSubject
,以便我可以决定何时将排放推送到它。 - 有一项
Runnable
任务将新的排放量推向结束主题。此任务被安排(通过 Android 处理程序)在当前处理的源着陆发射发生后一秒钟运行。当随后在一秒钟内发生新着陆的新源发射时,该任务被取消,并在一秒钟后再次安排新的发射。
这是相关的Android代码:
final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();
final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
viewGroup.setOnTouchListener(new View.OnTouchListener() {
@Override
public boolean onTouch(View v, MotionEvent event) {
touchPublishSubject.onNext(event);
return true;
}
});
final PublishSubject<Object> windowClosePublishSubject = PublishSubject.create();
final Handler handler = new Handler(Looper.getMainLooper());
final Runnable r = new Runnable() {
@Override
public void run() {
Log.d(TAG, "will emit closing item");
windowClosePublishSubject.onNext("now!");
}
};
ViewObservable
.bindView(viewGroup, touchPublishSubject)
.filter(new Func1<MotionEvent, Boolean>() {
@Override
public Boolean call(MotionEvent motionEvent) {
return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
}
})
.doOnNext(new Action1<MotionEvent>() {
@Override
public void call(MotionEvent motionEvent) {
// restart the timer
Log.d(TAG, "cancelling closing");
handler.removeCallbacks(r);
Log.d(TAG, "scheduling closing");
handler.postDelayed(r, 1000L);
// show the touch
Log.d(TAG, motionEvent.toString());
}
})
.buffer(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
Log.d(TAG, "creating buffer closing selector");
return windowClosePublishSubject
.doOnNext(new Action1<Object>() {
@Override
public void call(Object o) {
Log.d(TAG, "emitting closing item '" + o + "'");
}
});
}
})
.subscribe(new Action1<List<MotionEvent>>() {
@Override
public void call(List<MotionEvent> motionEvents) {
// show number of touch downs
Log.d(TAG, "got " + motionEvents.size() + " touch downs");
}
});
我不喜欢Handler
在这个解决方案中使用和所有这些,所以我进一步研究了。
第二个片段(touchPublishSubject 和 touch listener 完全相同):
- 我重用着
touchPublishSubject
陆生成作为窗口关闭可观察到,首先用 1 秒超时对其进行去抖动 - 显然,随着去抖动发生在 上
Scheduler.computation()
,它移动观察到同一个调度程序,我需要使用observeOn(AndroidSchedulers.mainThread())
- 我觉得有点奇怪的是嵌套Observable
的调度程序,它只关闭缓冲区窗口,促进整个链在它的调度程序中发生以及
编码:
final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();
final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
viewGroup.setOnTouchListener(new View.OnTouchListener() {
@Override
public boolean onTouch(View v, MotionEvent event) {
touchPublishSubject.onNext(event);
return true;
}
});
ViewObservable
.bindView(viewGroup, touchPublishSubject)
.filter(new Func1<MotionEvent, Boolean>() {
@Override
public Boolean call(MotionEvent motionEvent) {
return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
}
})
.doOnNext(new Action1<MotionEvent>() {
@Override
public void call(MotionEvent motionEvent) {
// show the touch
Log.d(TAG, motionEvent.toString());
}
})
.buffer(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
Log.d(TAG, "creating buffer closing selector");
return touchPublishSubject
.debounce(1L, TimeUnit.SECONDS)
.doOnNext(new Action1<Object>() {
@Override
public void call(Object o) {
Log.d(TAG, "emitting closing item '" + o + "'");
}
});
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<MotionEvent>>() {
@Override
public void call(List<MotionEvent> motionEvents) {
// show number of touch downs
Log.d(TAG, "got " + motionEvents.size() + " touch downs");
}
});
这段代码有效,我比第一个更喜欢它,感觉更像是应该用 Rx 来完成。但它很复杂,因为Observable
要获得它需要嵌套和大脑体操。我是否buffer
缺少自动执行相同操作的重载(即在最后一次发射时间为 1 秒后关闭其窗口)?
编辑:其中一条评论让我意识到 Ben Christensen 的演讲,然后我发现了这个:https ://blog.kaush.co/2015/01/05/debouncedbuffer-with-rxjava/ ,它链接到一些问题的实施。似乎是一个非常普遍的要求,如果有一个内置的运算符来做这个会很好。无论如何,我会考虑这些其他来源中提出的解决方案,这里是此类问题的规范。