2

使用 Akka 将 redis 与我的 Scala 应用程序集成,但由于某种原因它没有收到任何消息。我可以通过在命令行上打开 redis-cli 来确认 redis 确实有大量流量。

在 pSubscribe 之后,它会收到:subscribed to * and count = 1

我的猜测是,这可能与 Akka 设置接收回调的方式有关。由于一些冲突,我不得不在 scala-redis 库中删除 Scala Actors 并用 Akka Actors 替换它们。

这是代码:

订阅者演员

class Subscriber(client: RedisClient) extends Actor {
  var callback: PubSubMessage => Any = { m => }

  def receive: Receive = { 
    case Subscribe(channels) =>
      client.subscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribe(channels) =>
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribeAll(channels) =>
      Logger.info("Subscribing to all channels")
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case Register(cb) =>
      Logger.info("Callback is registered")
      callback = cb

    case Unsubscribe(channels) =>
      client.unsubscribe(channels.head, channels.tail: _*)

    case UnsubscribeAll =>
      client.unsubscribe
  }
}

初始化订阅者

class RelaySub extends Actor {

  // important config values
  val system = ActorSystem("pubsub")
  val conf = play.api.Play.current.configuration
  val relayPubHost = conf.getString("relays.redis.host").get
  val relayPubPort = conf.getInt("relays.redis.port").get

  val rs = new RedisClient(relayPubHost, relayPubPort)
  val s = system.actorOf(Props(new Subscriber(rs)))
  s ! Register(callback) 
  s ! pSubscribeAll(Array("*"))
  Logger.info("Engine Relay Subscriber has started up")

  def receive: Receive = {      
    case Register(callback) =>
  }

  def callback(pubsub: PubSubMessage) = pubsub match {
    case S(channel, no) => Logger.info("subscribed to " + channel + " and count = " + no)
      case U(channel, no) => Logger.info("unsubscribed from " + channel + " and count = " + no)
      case M(channel, msg) => 
        msg match {
          // exit will unsubscribe from all channels and stop subscription service
          case "exit" => 
            Logger.info("unsubscribe all ... no handler yet ;)")

          // message "+x" will subscribe to channel x
          case x if x startsWith "+" => 
            Logger.info("subscribe to ... no handler yet ;)")

          // message "-x" will unsubscribe from channel x
          case x if x startsWith "-" => 
            Logger.info("unsubscribe from ... no handler yet ;)")

          // other message receive
          case x => 
            Logger.info("Engine: received redis message")
            val channelVars = channel.split(".").toArray[String]
            if(channelVars(0)!=Engine.instanceID)
                channelVars(1) match {
                  case "relay" => 
                    EngineSyncLocal.constructRelay(channel, msg)
                  case _ => 
                    Logger.error("Engine: received unknown redis message")
                }
        }
  }
}

谢谢你的帮助!

4

2 回答 2

2

我发现了问题。它似乎是 scala-redis 客户端中的一个错误。

我在消费者类中添加了一些日志记录并开始收到Engine: weird message错误,这意味着它无法识别传入的流量。我会联系作者并提出拉取请求。

编码:

class Consumer(fn: PubSubMessage => Any) extends Runnable {

    def start () {
      val myThread = new Thread(this) ;
      myThread.start() ;
    }

    def run {
      whileTrue {
        asList match {
          case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
            Logger.info("Engine: redis traffic")
            msgType match {
              case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
              case "unsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "punsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "unsubscribe" | "punsubscribe" => 
                fn(U(channel, data.toInt))
              case "message" | "pmessage" => 
                fn(M(channel, data))
              case x => throw new RuntimeException("unhandled message: " + x)
            }
          case _ => Logger.error("Engine: weird redis message")
        }
      }
    }
  }
于 2012-05-28T19:19:13.287 回答
0
            case x => throw new RuntimeException("unhandled message: " + x)
          }
case Some(Some("pmessage")::Some(pattern)::Some(channel):: Some(message)::Nil)=>
              fn(M(channel, message))

asList 匹配缺少一个案例

于 2012-10-17T16:31:32.440 回答