2

我想要实现的是可序列化和可组合的进程描述符。基本上我将创建一些原始Processor(可序列化),然后我希望能够将它们组合成更高Processor的顺序,然后整个事情应该保持自动可序列化。这是我当前的实现,但是我怀疑使用某些猫类型类/数据结构可以使用更优雅的方法来执行此操作。我想不出一种方法来使用那些强大的工具,如 Free、Kleisli 或 State,我感到很愚蠢。我的挑战是我的状态类型,即 中的数据字段DataWithContext,不断变化。

但一定有办法克服,不是吗?

object Test {
  import cats.implicits._
  import cats.data.XorT
  import scala.concurrent.Future

  type Cause = String

  case class DataWithContext[+A](data: A, context: List[String]) //context never need to change

  trait Processor[-A, B] extends Serializable {
    def process: DataWithContext[A] ⇒ XorT[Future, Cause, B]
  }

  implicit class ProcessorOps[A, B](self: Processor[A, B]) {
    def >>[C](that: Processor[B, C]) = Con(self, that)
    def zip[C](that: Processor[A, C]) = Zip(self, that)
  }

  //concat two processors
  case class Con[A, B, C](a: Processor[A, C], b: Processor[C, B]) extends Processor[A, B] {
    def process: DataWithContext[A] ⇒ XorT[Future, Cause, B] = (pc: DataWithContext[A]) ⇒
      a.process(pc).flatMap { c ⇒
        b.process(pc.copy(data = c))
      }
  }

  //zip two processors
  case class Zip[A, B, C](p1: Processor[A, B], p2: Processor[A, C])
    extends Processor[A, (B, C)] {
    def process: DataWithContext[A] ⇒ XorT[Future, Cause, (B, C)] = 
      (pc: DataWithContext[A]) ⇒
        for {
          b ← p1.process(pc)
          c ← p2.process(pc)
        } yield (b, c)
  }

  //an example of a primitive Processor
  case object Count extends Processor[String, Int] {
    def process: DataWithContext[String] ⇒ XorT[Future, Cause, Int] =
     (dc: DataWithContext[String]) => 
       XorT.pure[Future, Cause, Int](dc.data.length)
  }

}
4

1 回答 1

3

我有 :

  • Processor.process了一个Kleisli
  • zipand>>方法移至Processor特征本身。
  • 引入了一些类型别名。

结果是:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import cats.implicits._
import cats.data.{XorT, Kleisli}
import cats.Apply

type Cause = String
type Result[A] = XorT[Future, Cause, A]
type Process[A, B] = Kleisli[Result, DataWithContext[A], B]

case class DataWithContext[+A](data: A, context: List[String]) 

implicit class ContextOps[A](a: A) {
  def withContext(ctx: List[String]) = DataWithContext(a, ctx)
}

trait Processor[A, B] extends Serializable { self => 
  val process: Process[A, B]

  def andThen[C](that: Processor[B, C]): Processor[A, C] = 
    Processor.instance(Kleisli { dc => 
      (process.map(_.withContext(dc.context)) andThen that.process).run(dc)
    })

  // alias for andThen
  def >>[C](that: Processor[B, C]) = this andThen that

  def zip[C](that: Processor[A, C]): Processor[A, (B, C)] = 
    Processor.instance(Kleisli { dc => 
      Apply[Result].tuple2(self.process.run(dc), that.process.run(dc))
    })
}

object Processor {
  // create a Processor from a Process 
  def instance[A, B](p: Process[A, B]) = new Processor[A, B] {
    val process = p
  }
}

可以用作:

object Count extends Processor[String, Int] {
  val process: Process[String, Int] =
    Kleisli[Result, DataWithContext[String], Int] {
      dc => XorT.pure[Future, Cause, Int](dc.data.length)
    }
}

val times2: Processor[Int, Int] = Processor.instance(
  Kleisli[Result, DataWithContext[Int], Int] ( 
    dc => XorT.pure[Future, Cause, Int](dc.data * 2)))

(Count zip Count).process.run("hello".withContext(List("Context"))) map println
// (5,5)
(Count >> times2).process.run("hello".withContext(List("Context"))) map println
// 10
于 2016-01-01T16:04:41.717 回答