7

我一直在玩 Akka Persistence,并编写了以下程序来测试我的理解。问题是每次运行该程序时都会得到不同的结果。正确答案是 49995000,但我并不总是明白这一点。我已经在每次运行之间清理了日志目录,但没有任何区别。谁能看到出了什么问题?该程序只是将所有从 1 到 n 的数字相加(其中 n 在下面的代码中是 9999)。

正确答案是:(n * (n+1)) / 2。对于 n=9999,即 49995000。

编辑: JDK 8 似乎比 JDK 7 更一致。我应该只使用 JDK 8 吗?

package io.github.ourkid.akka.aggregator.guaranteed

import akka.actor.Actor
import akka.actor.ActorPath
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.persistence.AtLeastOnceDelivery
import akka.persistence.PersistentActor

case class ExternalRequest(updateAmount : Int)
case class CountCommand(deliveryId : Long, updateAmount : Int)
case class Confirm(deliveryId : Long)

sealed trait Evt
case class CountEvent(updateAmount : Int) extends Evt
case class ConfirmEvent(deliveryId : Long) extends Evt

class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery {

  override def persistenceId = "persistent-actor-ref-1"

  override def receiveCommand : Receive = {
    case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState)
    case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState)
  }

  override def receiveRecover : Receive = {
    case evt : Evt => updateState(evt)
  }

  def updateState(evt:Evt) = evt match {
    case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount))
    case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId)
  }
}

class FactorialActor extends Actor {
  var count = 0
  def receive = {
    case CountCommand(deliveryId : Long, updateAmount:Int) => {
      count = count + updateAmount
      sender() ! Confirm(deliveryId)
    }
    case "print" => println(count)
  }
}

object GuaranteedDeliveryTest extends App {
  val system = ActorSystem()

  val factorial = system.actorOf(Props[FactorialActor])

  val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path))

  import system.dispatcher

  system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" }

  for (i <- 1 to 9999)
    delActor ! ExternalRequest(i) 



}

SBT 文件

name := "akka_aggregator"

organization := "io.github.ourkid"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.4"

scalacOptions ++= Seq("-unchecked", "-deprecation")

resolvers ++= Seq(
    "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
)

val Akka  = "2.3.7"
val Spray = "1.3.2"

libraryDependencies ++= Seq(
     // Core Akka
    "com.typesafe.akka" %% "akka-actor" % Akka,
    "com.typesafe.akka" %% "akka-cluster" % Akka,
    "com.typesafe.akka" %% "akka-persistence-experimental" % Akka,
    "org.iq80.leveldb" % "leveldb" % "0.7",
    "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",

    // For future REST API
    "io.spray" %% "spray-httpx" % Spray,
    "io.spray" %% "spray-can" % Spray,
    "io.spray" %% "spray-routing" % Spray,
    "org.typelevel" %% "scodec-core" % "1.3.0",

    // CSV reader    
    "net.sf.opencsv" % "opencsv" % "2.3",

    // Logging
    "com.typesafe.akka" %% "akka-slf4j" % Akka,
    "ch.qos.logback" % "logback-classic" % "1.0.13",

    // Testing
    "org.scalatest" %% "scalatest" % "2.2.1" % "test",
    "com.typesafe.akka" %% "akka-testkit" % Akka % "test",
    "io.spray" %% "spray-testkit" % Spray % "test",
    "org.scalacheck" %% "scalacheck" % "1.11.6" % "test"
)
fork := true
mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor")

应用程序.conf 文件

##########################################
# Akka Persistence Reference Config File #
##########################################

akka {

  # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "DEBUG"

  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "INFO"

  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  # Protobuf serialization for persistent messages
  actor {

    serializers {

      akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer"
      akka-persistence-message = "akka.persistence.serialization.MessageSerializer"
    }

    serialization-bindings {

      "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
      "akka.persistence.serialization.Message" = akka-persistence-message
    }
  }

  persistence {

    journal {

      # Maximum size of a persistent message batch written to the journal.
      max-message-batch-size = 200

      # Maximum size of a deletion batch written to the journal.
      max-deletion-batch-size = 10000

      # Path to the journal plugin to be used
      plugin = "akka.persistence.journal.leveldb"

      # In-memory journal plugin.
      inmem {

        # Class name of the plugin.
        class = "akka.persistence.journal.inmem.InmemJournal"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.actor.default-dispatcher"
      }

      # LevelDB journal plugin.
      leveldb {

        # Class name of the plugin.
        class = "akka.persistence.journal.leveldb.LeveldbJournal"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

        # Dispatcher for message replay.
        replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"

        # Storage location of LevelDB files.
        dir = "journal"

        # Use fsync on write
        fsync = on

        # Verify checksum on read.
        checksum = off

        # Native LevelDB (via JNI) or LevelDB Java port
        native = on
        # native = off
      }

      # Shared LevelDB journal plugin (for testing only).
      leveldb-shared {

        # Class name of the plugin.
        class = "akka.persistence.journal.leveldb.SharedLeveldbJournal"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.actor.default-dispatcher"

        # timeout for async journal operations
        timeout = 10s

        store {

          # Dispatcher for shared store actor.
          store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

          # Dispatcher for message replay.
          replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

          # Storage location of LevelDB files.
          dir = "journal"

          # Use fsync on write
          fsync = on

          # Verify checksum on read.
          checksum = off

          # Native LevelDB (via JNI) or LevelDB Java port
          native = on
        }
      }
    }

    snapshot-store {

      # Path to the snapshot store plugin to be used
      plugin = "akka.persistence.snapshot-store.local"

      # Local filesystem snapshot store plugin.
      local {

        # Class name of the plugin.
        class = "akka.persistence.snapshot.local.LocalSnapshotStore"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

        # Dispatcher for streaming snapshot IO.
        stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"

        # Storage location of snapshot files.
        dir = "snapshots"
      }
    }

    view {

      # Automated incremental view update.
      auto-update = on

      # Interval between incremental updates
      auto-update-interval = 5s

      # Maximum number of messages to replay per incremental view update. Set to
      # -1 for no upper limit.
      auto-update-replay-max = -1
    }

    at-least-once-delivery {
      # Interval between redelivery attempts
      redeliver-interval = 5s

      # Maximum number of unconfirmed messages that will be sent in one redelivery burst
      redelivery-burst-limit = 10000

      # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`
      # message will be sent to the actor.
      warn-after-number-of-unconfirmed-attempts = 5

      # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is
      # allowed to hold in memory.
      max-unconfirmed-messages = 100000
    }

    dispatchers {
      default-plugin-dispatcher {
        type = PinnedDispatcher
        executor = "thread-pool-executor"
      }
      default-replay-dispatcher {
        type = Dispatcher
        executor = "fork-join-executor"
        fork-join-executor {
          parallelism-min = 2
          parallelism-max = 8
        }
      }
      default-stream-dispatcher {
        type = Dispatcher
        executor = "fork-join-executor"
        fork-join-executor {
          parallelism-min = 2
          parallelism-max = 8
        }
      }
    }
  }
}

正确的输出:

18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3974790
24064453
18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
49995000
49995000
49995000

错误运行:

17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started
0
17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3727815
22167811
17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
51084018
51084018
52316760
52316760
52316760
52316760
52316760

另一个错误的运行:

17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
2982903
17710176
49347145
17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
51704199
51704199
55107844
55107844
55107844
55107844
4

1 回答 1

10

您正在使用AtLeastOnceDelivery语义。正如这里所说:

注意至少一次传递意味着原始消息发送顺序并不总是保留,并且目标可能会收到重复的消息。这意味着语义与普通 ActorRef 发送操作的语义不匹配:

由于崩溃后可能重新发送,因此不会保留同一发送方-接收方对的最多一次传递消息顺序,并且目标消息仍然被传递——给新的参与者化身这些语义类似于ActorPath 表示(请参阅 Actor 生命周期),因此您需要在传递消息时提供路径而不是引用。消息通过参与者选择发送到路径。

所以有些号码可能不止一次收到。您可以忽略内部重复的数字FactorialActor或不使用此语义。

于 2014-12-21T19:35:00.280 回答