6

说未引用的演员仍然订阅事件流是否正确?至少,这是我从 Akka 实验中得到的……

我正在尝试在 EventBus 场景中为参与者实现弱引用。在这些情况下,事件侦听器/参与者通常来来去去。与应该一直在场的独立演员不同。当然,显式注销确实有效。但我并不总是能够感知到正确的时机来做这件事。

Akka 是否提供这种用例?

val as = ActorSystem.create("weak")
var actor = as.actorOf(Props[ExceptionHandler])
as.eventStream.subscribe(actor,classOf[Exception])

// an event is published & received
as.eventStream.publish(new KnownProblem)

//session expires or whatever that makes the actor redundant
actor = null
(1 to 30).foreach(_ => System.gc)

// an event is published & STILL received
as.eventStream.publish(new KnownProblem)
4

1 回答 1

0

好的,我实际上无法实现它,但演员正在 GC 上停止。使用 Scala 2.9.2 (REPL) + Akka 2.0.3。

EventBuswithWeakReference[ActorRef]没有帮助——因为在 Akka 中你也有一个with dungeon( ChildrenContainer) self.children,也可以Monitor订阅生命周期事件。我没有尝试过的事情 - 使用只知道我们新闪亮的调度程序创建演员WeakEventBus- 所以也许我错过了重点?

下面是 REPL 的代码(从适当的导入开始,:paste分两步):

// Start REPL with something like:
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"

// :paste 1/2
import akka.actor._
import akka.pattern._
import akka.event._
import akka.util._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import akka.dispatch.Await
import scala.ref.WeakReference
import java.util.Comparator
import java.util.concurrent.atomic._
import java.util.UUID

case class Message(val id:String,val timestamp: Long)
case class PostMessage(
  override val id:String=UUID.randomUUID().toString(), 
  override val timestamp: Long=new java.util.Date().getTime(), 
  text:String) extends Message(id, timestamp)  
case class MessageEvent(val channel:String, val message:Message)

case class StartServer(nodeName: String)
case class ServerStarted(nodeName: String, actor: ActorRef) 
case class IsAlive(nodeName: String)
case class IsAliveWeak(nodeName: String)
case class AmAlive(nodeName: String, actor: ActorRef)
case class GcCheck()
case class GcCheckScheduled(isScheduled: Boolean, 
  gcFlag: WeakReference[AnyRef])

trait WeakLookupClassification { this: WeakEventBus ⇒
protected final val subscribers = new Index[Classifier, 
  WeakReference[Subscriber]](mapSize(), 
    new Comparator[WeakReference[Subscriber]] {
          def compare(a: WeakReference[Subscriber], 
        b: WeakReference[Subscriber]): Int = { 
              if (a.get == None || b.get == None) -1
              else compareSubscribers(a.get.get, b.get.get)
        }
      })  
protected def mapSize(): Int
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
protected def classify(event: Event): Classifier
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = 
  subscribers.put(to, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = 
  subscribers.remove(from, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber): Unit = 
  subscribers.removeValue(new WeakReference(subscriber))
def publish(event: Event): Unit = {
      val i = subscribers.valueIterator(classify(event))
      while (i.hasNext) publish(event, i.next().get.get)
}
  }

class WeakEventBus extends EventBus with WeakLookupClassification {
  type Event = MessageEvent
  type Classifier=String   
  type Subscriber = ActorRef

  protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b

  protected def mapSize(): Int = 10
  protected def classify(event: Event): Classifier = event.channel
  protected def publish(event: Event, subscriber: Subscriber): Unit =
      subscriber ! event
}

lazy val weakEventBus = new WeakEventBus

implicit val timeout = akka.util.Timeout(1000)
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" 
akka {
  loglevel = "DEBUG"
  actor {
        provider = "akka.remote.RemoteActorRefProvider"
        debug {
          receive = on 
          autoreceive = on        
          lifecycle = on
          event-stream = on
        }
  }
  remote {
        transport = "akka.remote.netty.NettyRemoteTransport"
        log-sent-messages = on 
        log-received-messages = on    
  }
}
serverconf {
  include "common"
  akka {
        actor {
          deployment {
        /root {
          remote = "akka://serversys@127.0.0.1:2552"
        }    
          }
        }
        remote {
          netty {
        hostname = "127.0.0.1"
        port = 2552
          }
        }
  }
}
""").getConfig("serverconf"))

class Server extends Actor {
  private[this] val scheduled = new AtomicBoolean(false)
  private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()

  val gcCheckPeriod = Duration(5000, "millis")

  override def preRestart(reason: Throwable, message: Option[Any]) {
        self ! GcCheckScheduled(scheduled.get, gcFlagRef.get)
        super.preRestart(reason, message)
  }

  def schedule(period: Duration, who: ActorRef) = 
        actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck)  

  def receive = {    
        case StartServer(nodeName) => 
          sender ! ServerStarted(nodeName, self)
          if (scheduled.compareAndSet(false, true)) 
        schedule(gcCheckPeriod, self)
          val gcFlagObj = new AnyRef()              
          gcFlagRef.set(new WeakReference(gcFlagObj))
          weakEventBus.subscribe(self, nodeName)
          actorSystem.eventStream.unsubscribe(self)      
        case GcCheck =>
          val gcFlag = gcFlagRef.get
          if (gcFlag == null) {
        sys.error("gcFlag")
          }
         gcFlag.get match {
        case Some(gcFlagObj) =>
          scheduled.set(true)
          schedule(gcCheckPeriod, self)  
        case None =>
          println("Actor stopped because of GC: " + self)
          context.stop(self)        
        }
        case GcCheckScheduled(isScheduled, gcFlag) =>
          if (isScheduled && scheduled.compareAndSet(false, isScheduled)) {
        gcFlagRef.compareAndSet(null, gcFlag)
        schedule(gcCheckPeriod, self)            
          }
        case IsAlive(nodeName) =>
          println("Im alive (default EventBus): " + nodeName)
          sender ! AmAlive(nodeName, self)
        case e: MessageEvent => 
          println("Im alive (weak EventBus): " + e)       
    }   
} 

// :paste 2/2
class Root extends Actor { 
  def receive = {
      case start @ StartServer(nodeName) =>
        val server = context.actorOf(Props[Server], nodeName)
        context.watch(server)
        Await.result(server ? start, timeout.duration)
      .asInstanceOf[ServerStarted] match {
        case started @ ServerStarted(nodeName, _) => 
          sender ! started
        case _ => 
          throw new RuntimeException(
            "[S][FAIL] Could not start server: " + start)
        }
      case isAlive @ IsAlive(nodeName) =>
        Await.result(context.actorFor(nodeName) ? isAlive, 
      timeout.duration).asInstanceOf[AmAlive] match {
        case AmAlive(nodeName, _) => 
          println("[S][SUCC] Server is alive : " + nodeName)
        case _ => 
      throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)    
          }
      case isAliveWeak @ IsAliveWeak(nodeName) =>                
        actorSystem.eventStream.publish(MessageEvent(nodeName, 
      PostMessage(text="isAlive-default")))  
        weakEventBus.publish(MessageEvent(nodeName, 
      PostMessage(text="isAlive-weak")))  
}
  }

lazy val rootActor = actorSystem.actorOf(Props[Root], "root")

object Root {
  def start(nodeName: String) = {
        val msg = StartServer(nodeName)
        var startedActor: Option[ActorRef] = None
        Await.result(rootActor ? msg, timeout.duration)
      .asInstanceOf[ServerStarted] match {
            case succ @ ServerStarted(nodeName, actor) => 
          println("[S][SUCC] Server started: " + succ)
          startedActor = Some(actor)
            case _ => 
      throw new RuntimeException("[S][FAIL] Could not start server: " + msg)
          }
        startedActor  
  }
  def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)
  def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)
}

////////////////
// actual test 
Root.start("weak")
Thread.sleep(7000L)
System.gc()
Root.isAlive("weak")
于 2013-01-02T22:13:24.447 回答