1

我正在编写以下代码片段以从 firebase 数据库中获取已保存食物的列表,然后使用该列表,我再次从 firebase 数据库中获取个人食物详细信息。

以下代码工作正常,除了我无法弄清楚如何让第二个 flatMap 知道第一个 flatMap 的发射已经完成(所有食物列表都已处理)。所以我无法调用onCompleted()方法,因此无法检测到整个过程何时完成。

查看以下代码段中的评论:

Observable.create<List<PersonalizedFood>> {

            FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener {
                override fun onCancelled(p0: DatabaseError?) {

                }

                override fun onDataChange(p0: DataSnapshot?) {
                    val list = ArrayList<PersonalizedFood>()
                    p0?.let {
                        for (dateObject in p0.children) {
                            for (foodItem in dateObject.children) {
                                val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood
                                list.add(PersonalizedFood(food))
                            }
                        }
                    }
                    it.onNext(list)
                    it.onCompleted()
                }
            })
        }.subscribeOn(Schedulers.io()).flatMap {
            Observable.from(it) // returning a Observable that emits items of list ("it" is the list here) 
        }.observeOn(Schedulers.io()).flatMap {
        // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called.
            personalizedFood ->

            Observable.create<Boolean>{
                FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{
                    override fun onCancelled(p0: DatabaseError?) {
                        it.onError(p0?.toException())
                    }

                    override fun onDataChange(p0: DataSnapshot?) {
                        if(p0 != null) {
                            val food = p0.getValue(FBFood::class.java)!!
                            val repo = LocalFoodRepository()
                            doAsync {
                                repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
                                repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
                                repo.saveFood(this@LoginActivity, personalizedFood)
                                it.onNext(true)
                            }

                        }else {
                            it.onNext(false)
                        }
                    }

                })
            }
        }.observeOn(Schedulers.io()).doOnCompleted{
            dismissProgressDialog()
            finish()
        }.doOnError{
            it.printStackTrace()
            dismissProgressDialog()
            finish()
        }.subscribe()

谢谢。

4

1 回答 1

2

当它Observable发出flatMap的所有可观察对象都调用onCompleted(). 代码中的第二个flatMap从不调用onCompleted(),因为它创建的任何可观察对象都没有调用onCompleted()

你应该调用onCompleted()你的onDataChange()方法。由于每个创建的 observableflatMap都只发出一个 item,因此可以在方法之后直接调用它onNext()

override fun onDataChange(p0: DataSnapshot?) {
    if(p0 != null) {
        val food = p0.getValue(FBFood::class.java)!!
        val repo = LocalFoodRepository()
        doAsync {
            repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
            repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
            repo.saveFood(this@LoginActivity, personalizedFood)
            it.onNext(true)
            it.onCompleted()
        }
    } else {
        it.onNext(false)
        it.onCompleted()
    }
}
于 2017-07-19T20:08:50.697 回答