0

我在使用 Akka 持久性时遇到了意外行为。我对 Akka 还很陌生,所以如果我错过了一些明显的东西,请提前道歉。

我有一个叫 PCNProcessor 的演员。我为我拥有的每个 PCN id 创建一个演员实例。我遇到的问题是,当我创建第一个演员实例时,一切正常,我收到了已处理的响应。但是,当我使用不同的 PCN id 创建更多 PCNProcessor 实例时,我得到了已经处理的 PCN响应。

本质上,由于某种原因,作为第一个 PCN id 处理器的一部分存储的快照被重新应用到后续的 PCN id 实例,即使它与该 PCN 无关并且 PCN id 不同。为了确认这种行为,我在 receiveRecover 中打印了一个日志,并且每个后续 PCNProcessor 实例都会收到不属于它的快照。

我的问题是:

  1. 我是否应该以特定方式存储快照,以便根据 PCN Id 键入它们?然后我应该在上下文中过滤掉与 PCN 无关的快照吗?
  2. 或者 Akka 框架是否应该在幕后处理这个问题,我不必担心根据 PCN id 存储快照。

演员的源代码如下。我确实使用分片。


package com.abc.pcn.core.actors

import java.util.UUID

import akka.actor._
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer}
import com.abc.common.AutoPassivation
import com.abc.pcn.core.events.{PCNNotProcessedEvt, PCNProcessedEvt}

object PCNProcessor {

  import akka.contrib.pattern.ShardRegion
  import com.abc.pcn.core.PCN

  val shardName = "pcn"
  val idExtractor: ShardRegion.IdExtractor = {
    case ProcessPCN(pcn) => (pcn.id.toString, ProcessPCN(pcn))
  }
  val shardResolver: ShardRegion.ShardResolver = {
    case ProcessPCN(pcn) => pcn.id.toString
  }

  // shard settings
  def props = Props(classOf[PCNProcessor])

  // command and response
  case class ProcessPCN(pcn: PCN)

  case class NotProcessed(reason: String)

  case object Processed

}

class PCNProcessor
  extends PersistentActor
  with AtLeastOnceDelivery
  with AutoPassivation
  with ActorLogging {

  import com.abc.pcn.core.actors.PCNProcessor._

  import scala.concurrent.duration._

  context.setReceiveTimeout(10.seconds)

  private val pcnId = UUID.fromString(self.path.name)
  private var state: String = "not started"

  override def persistenceId: String = "pcn-processor-${pcnId.toString}"

  override def receiveRecover: Receive = {
    case SnapshotOffer(_, s: String) =>
      log.info("Recovering. PCN ID: " + pcnId + ", State to restore: " + s)
      state = s
  }

  def receiveCommand: Receive = withPassivation {

    case ProcessPCN(pcn)
      if state == "processed" =>
      sender ! Left(NotProcessed("Already processed PCN"))

    case ProcessPCN(pcn)
      if pcn.name.isEmpty =>
      val error: String = "Name is invalid"
      persist(PCNNotProcessedEvt(pcn.id, error)) { evt =>
        state = "invalid"
        saveSnapshot(state)
        sender ! Left(NotProcessed(error))
      }

    case ProcessPCN(pcn) =>
      persist(PCNProcessedEvt(pcn.id)) { evt =>
        state = "processed"
        saveSnapshot(state)
        sender ! Right(Processed)
      }
  }
}

更新:

注销接收到的快照的元数据后,我可以看到问题是 snapshotterId 没有正确解析,并且总是被设置为 pcn-processor- ${pcnId.toString}而没有解析斜体位。

[信息] [06/06/2015 09:10:00.329] [ECP-akka.actor.default-dispatcher-16] [akka.tcp://ECP@127.0.0.1:2551/user/sharding/pcn/16b3d4dd -9e0b-45de-8e32-de799d21e7c5] 正在恢复。PCN ID:16b3d4dd-9e0b-45de-8e32-de799d21e7c5,快照元数据SnapshotMetadata(pcn-processor-${pcnId.toString},1,1433577553585)

4

2 回答 2

3

我认为您在滥用 Scala 字符串插值功能。
尝试以下方式:

override def persistenceId: String = s"pcn-processor-${pcnId.toString}"

请注意s字符串文字前的使用。

于 2015-06-08T15:38:50.520 回答
0

好的,通过将持久性 ID 更改为以下行来解决此问题:

override def persistenceId: String = "pcn-processor-" + pcnId.toString

原始字符串版本:

override def persistenceId: String = "pcn-processor-${pcnId.toString}"

仅适用于持久化日志,但不适用于快照。

于 2015-06-06T08:20:15.200 回答