5

我有一些使用Monix Observable对文件进行流处理的代码。为了测试这段代码,我希望我对 的操作Observable是类型独立的,所以我也可以在任何其他数据结构上执行它们,List比如 这就是为什么我编写了以下代码来抽象底层数据结构的原因:

def permutations[F[_] : Applicative : FunctorFilter : SemigroupK](chars: F[Char]): F[F[Char]] = {
  Range.inclusive('a', 'z').map(_.toChar)
    .map { c ⇒
      FunctorFilter[F].filter(chars)(Character.toLowerCase _ andThen (_ != c))
    }
    .map(Applicative[F].pure)
    .reduceLeft(SemigroupK[F].combineK)
}

让我烦恼的是,这段代码创建了很多中间数据结构。有没有我可以使用的类型类来提高这个过程的效率?无需太多开销即可将一个数据结构提升到另一个数据结构的东西,例如,LiftIO但对于项目集合?

4

2 回答 2

1

看起来猫对此没有任何帮助。而且 monix 也不是更好,它只实现了来自猫的少数类型类。

所以,我最好的猜测是自己定义这样的类型类:

import monix.execution.Scheduler.Implicits.global
import cats._
import cats.implicits._
import monix.reactive._

object Test {

  def main(args: Array[String]): Unit = {

    println(permutations(List('a', 'b', 'c')))

    permutations(Observable('a', 'b', 'c')).foreach{c =>
      print("Observable(")
      c.foreach(c1 => print(c1 + " "))
      print(") ")
    }
  }

  def permutations[F[_] : Applicative](chars: F[Char])(implicit seq: Sequence[F], fil: Filter[F]): F[F[Char]] = {

    val abc = seq.fromIterable(
      Range.inclusive('a', 'z').map(_.toChar)
    )

    abc.map(c => fil.filter(chars)(_ != c))
  }

  trait Sequence[F[_]] {

    def fromIterable[A](f: Iterable[A]): F[A]
  }

  implicit val listSequence: Sequence[List] = new Sequence[List] {

    def fromIterable[A](f: Iterable[A]): List[A] = f.toList
  }

  implicit val observableSequence: Sequence[Observable] = new Sequence[Observable] {

    def fromIterable[A](f: Iterable[A]): Observable[A] = Observable.fromIterable(f)
  }

  trait Filter[F[_]] {

    def filter[A](fa: F[A])(f: A => Boolean): F[A]
  }

  implicit val observableFilterFunctor: Filter[Observable] = new Filter[Observable] {

    def filter[A](fa: Observable[A])(f: A => Boolean): Observable[A] =
      fa.filter(f)
  }

  implicit val listFilterFunctor: Filter[List] = new Filter[List] {

    def filter[A](fa: List[A])(f: A => Boolean): List[A] =
      fa.filter(f)
  }

}

结果:

List(List(b, c), List(a, c), List(a, b), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c))
Observable(b c ) Observable(a c ) Observable(a b ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) 

遗憾的是,我无法在 scalafiddle 或 scastie 上使用它,因为两者都不提供正确的1.5.0cat ( ) 和 monix ( 3.0.0-M3) 版本。

我仍然希望这会有所帮助。

于 2018-12-05T10:19:12.263 回答
0

尽管创建可重用函数很有用,但您Observable无需这样做即可轻松进行测试。

我建议将逻辑处理和副作用消费者分开

object StreamProcessing {
  def processItems(obs: Observable[Input]): Observable[Result] = ???
}

在产品中,你会做

val eventsStream: Observable[Input] = ???
val eventsConsumer: Consumer[Input, Output] = ???

StreamProcessing(myEventsStream).consumeWith(eventsConsumer)

然后,在您的测试中,您可以模拟您的测试数据,断言列表结果。此外,通过测试 Observable,您可以获得控制时间的能力TestScheduler,这使得测试变得轻而易举。

implicit val sc = TestScheduler()

val testData: List[Input] = ???
val expected: List[Output] = ???

val res = StreamProcessing(Observable.fromIterable(testData))
  .toListL
  .runToFuture

sc.tick()

assert(res.value, Some(Success(expected))

于 2019-11-16T17:48:27.963 回答