0

我有一个流ResponseMessage可以是不同的子类型。我想将流拆分为流,我可以在其自己的流中处理每种类型。

我的第一次尝试导致了我看不到的结果。

file.readLines()
        .toObservable()
        .map { mapper.readValue(it, ResponseMessage::class.java) }
        .groupBy { when(it) {
            is MarketChangeMessage -> it::class
            else -> it::class
        }}
        .map { it.????? } //How can possible this work?

我现在的问题是:将流划分为一种特定子类型的流的惯用方法是什么?

4

1 回答 1

3

您可以使用ofType运算符:

ofType( ) —— 只从源 Observable 中发出属于特定类的那些项目。

例子:

val messages = file.readLines()
    .toObservable()
    .map { mapper.readValue(it, ResponseMessage::class.java) }
    .share() // <-- or other multicasting operator

messages
    .ofType(MarketChangeMessage::class)
    .subscribe()

messages
    .ofType(Other::class)
    .subscribe()
于 2017-11-14T20:30:19.893 回答