0

背压有问题。使用发布主题在发出时获取传感器事件,并且需要在事务中订阅主题时将数据保存到数据库。

我一直在尝试使用 .window(100) 运算符,因此每当我连续获得 100 个传感器事件时,我都可以批量插入,但我只能在 .subscribe() 获得一项

不想通过使用缓冲区运算符来删除事件。处理这个问题的正确方法是什么?

@Override
public void onSensorChanged(SensorEvent sevent) {

    Sensor sensor = sevent.sensor;

    switch (sensor.getType()) {
        case Sensor.TYPE_ACCELEROMETER:
            sensorEventPublishSubject.onNext(sevent);
            break;
    }
}

sensorEventPublishSubject
            .map(event ->
                    new AccModel(
                            event.values[0],
                            event.values[1],
                            event.values[2],
                            event.accuracy                           
                    )
            )
            .window(100)
            .subscribe(
                    new Action1<Observable<AccModel>>() {
                        @Override
                        public void call(Observable<AccModel> accModelObservable) {
                            //insert in db
                        }
                    }
            );
4

1 回答 1

1

您有两种选择,具体取决于您要对 onError 事件执行的操作。

首先,您使用的解决方案.window是正确的,只是它发出一个 Observable,每 100 个事件您将获得一个 Observable,并且当您订阅它时,该 Observable 将重播这 100 个事件。此外,在发生错误的情况下,它也会按顺序重播错误事件(AFAIK)。

如果您不关心序列中的错误事件,那么有解决方案.buffer(100), 您应该将其放在前面,onErrorReturn()或者onErrorResumeNext()您将使用它来将onError事件转换为onNext. 这是因为如果是onError,buffer运算符会立即传播它,因此您会丢失临时缓冲区(<100)中的事件。

于 2016-05-25T21:56:38.150 回答