10

我正在尝试在我的 ES-CQRS 架构中实现读取端。假设我有一个这样的持久演员:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

据我了解,每次事件持续存在时,我们都可以通过Akka Persistence Query. 现在,我不确定订阅这些事件的正确方法是什么,以便我可以将它保存在我的读取端数据库中?其中一个想法是最初UsersStream从我的读取方actor向UserWriteactor发送消息,并在该读取actor中“接收”事件。

编辑

根据@cmbaxter 的建议,我以这种方式实现了读取端:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

有一些问题,例如: 事件流似乎很慢。即UserRead演员可以在保存新添加的用户之前用一组用户回答。

编辑 2

我增加了 cassandra 查询日志的刷新间隔,这更多地解决了慢事件流的问题。默认情况下,Cassandra 事件日志似乎每 3 秒轮询一次。在我的application.conf我补充说:

cassandra-query-journal {
  refresh-interval = 20ms
}

编辑 3

实际上,不要减少刷新间隔。这会增加内存使用量,但这并不危险,也不是一点。CQRS 的一般概念是写入端和读取端是异步的。因此,在您写入数据后将永远无法立即读取。处理用户界面?在读取端确认它们之后,我只需打开流并通过服务器发送的事件推送数据。

4

2 回答 2

6

有一些方法可以做到这一点。例如,在我的应用程序中,我的查询端有一个参与者,它有一个持续查找更改的 PersistenceQuery,但您也可以有一个具有相同查询的线程。问题是保持流打开,以便能够在持久事件发生时立即读取它

val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
  CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
  case userEvent: UserEvent => {
    doSomething(userEvent)
  }
}

取而代之的是,您可以使用一个计时器来引发 PersistenceQuery 并存储新事件,但我认为打开流是最好的方法

于 2016-07-11T10:58:18.753 回答
4

尽管仅使用 PersistenceQuery 的解决方案获得了批准,但它包含以下问题:

  1. 它是部分的,只有读取 EventEnvelopes 的方法。
  2. 它不能与状态快照一起使用,因此,CQRS 阅读器部分应该覆盖所有持久化的事件。

第一个解决方案更好,但存在以下问题:

  1. 这太复杂了。它导致用户不必要地处理序列号。
  2. 代码处理状态(查询/更新)也与 Actors 实现相结合。

存在更简单的一种:

import akka.NotUsed
import akka.actor.{Actor, ActorLogging}
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal}
import akka.persistence._
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source

/**
  * Created by alexv on 4/26/2017.
  */
class CQRSTest {

  // User Command, will be transformed to User Event
  sealed trait UserCommand
  // User Event
  // let's assume some conversion from Command to event here
  case class PersistedEvent(command: UserCommand) extends Serializable
  // User State, for simplicity assumed that all State will be snapshotted
  sealed trait State extends Serializable{
    def clear(): Unit
    def updateState(event: PersistedEvent): Unit
    def validateCommand(command:UserCommand): Boolean
    def applyShapshot(newState: State): Unit
    def getShapshot() : State
  }
  case class SaveSnapshot()

  /**
    * Common code for Both reader and writer
    * @param state - State
    */
  abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging {
    override def persistenceId: String = "CQRSPersistenceId"

    override def preStart(): Unit = {
      // Since the state is external and not depends to Actor's failure or restarts it should be cleared.
      state.clear()
    }

    override def receiveRecover: Receive = {
      case event : PersistedEvent => state.updateState(event)
      case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot)
      case RecoveryCompleted  => onRecoveryCompleted(super.lastSequenceNr)
    }

    abstract def onRecoveryCompleted(lastSequenceNr:Long)
  }

  class CQRSWriter(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSWriter Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed")
    }

    override def receiveCommand: Receive = {
      case command: UserCommand =>
        if(state.validateCommand(command)) {
          // Persist events and call state.updateState with each persisted event
          persistAll(List(PersistedEvent(command)))(state.updateState)
        }
        else {
          log.error("Validation Failed for Command: {}", command)
        }
      case SaveSnapshot => saveSnapshot(state.getShapshot())
      case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata)
      case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason)
    }
  }

  class CQRSReader(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSReader Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed, Starting QueryStream")

      // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests)
      val readJournal = PersistenceQuery(context.system).readJournalFor(
        context.system.settings.config.getString("akka.persistence.query.my-read-journal"))
        .asInstanceOf[ReadJournal
        with EventsByPersistenceIdQuery]
      val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
        OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue)
      source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer())

    }

    // Nothing received since it is Reader only
    override def receiveCommand: Receive = Actor.emptyBehavior
  }
}
于 2017-04-26T13:29:37.033 回答