0

实际上我已经创建了一个 RxSearch 类型的配置。我在其中附加了一个带有 PublishSubject 的 Edittext textChangeListener。使用事件将字符发送到 Observable,该 Observable 用作改造 API 调用的输入。

问题

我面临的唯一问题是有时我从可观察的 onError() 回调中的 API“流的意外结束”中得到错误。一旦我得到错误,Observable 就会停止工作。Observable 关闭,无法从 PublishSubject 的 onNext() 中获取字符。

看看RxSearchObservable

class RxSearchObservable {
companion object {
    fun fromView(editText: EditText): Observable<String> {
        val subject = PublishSubject.create<String>()
        editText.addTextChangedListener(object : TextWatcher {
            override fun afterTextChanged(s: Editable?) {
                //subject.onComplete()
            }

            override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
                //subject.onNext(s.toString())
            }

            override fun onTextChanged(s: CharSequence, start: Int, before: Int, count: Int) {
                if (s.isNotEmpty()) subject.onNext(s.toString())
            }
        })
        return subject
    }
}
}

我如何在 SwitchMap 中订阅和进行改造 API 调用。

 RxSearchObservable.fromView(edtToolSearch)
                    .debounce(700, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .retryWhen { t -> t.delay(3, TimeUnit.SECONDS) }
                    .switchMap { searchTerm ->
                        runOnUiThread { progressBar.visibility = View.VISIBLE }
                        apiManager.getSearchUnits(searchTerm)
                    }
                    .onErrorResumeNext(Observable.empty())
                    .subscribe({ response ->
                        Log.i("Called subscribe", ":::::::::::+++++++++++++++ GONE")
                        progressBar.visibility = View.GONE
                        if (response.isSuccessful) {
                            val units = response.body()
                            val searchedDatasets = units?.dataset
                            if (searchedDatasets?.size!! > 0) {
                                val searchAdapter = SearchAdapter(this@MapActivity, searchedDatasets, false)
                                listSearch.visibility = View.VISIBLE
                                listSearch.adapter = searchAdapter
                            } else {
                                toast("No items found !!!")
                            }
                        } else {
                            apiError = ErrorUtils.parseError(response)
                            toast(apiError.msg)
                        }
                    }, { t: Throwable? ->
                        progressBar.visibility = View.GONE
                        toast(t?.message.toString())
                    }))

任何想法、帮助、建议将不胜感激。提前致谢。

4

1 回答 1

0

错误终止的流。您可以retry()订阅,但这只能有条件地完成。也许超时,也许只有几次,也许只有某些错误。

在您的情况下,您应该考虑在switchMap. 像这样,错误不会到达主流。

.switchMap { searchTerm ->
    runOnUiThread { progressBar.visibility = View.VISIBLE }
    apiManager.getSearchUnits(searchTerm)
          .onErrorResumeNext(Observable.empty())
}
于 2018-12-20T07:43:46.980 回答