2

我已经在我的 RabbitMQ 服务器上建立了一个direct交换。我将消息发布到该服务器,如下所示:

val message = Json.toJson(myMessage).toString
subscriber.rabbitControl ! Message.exchange(message, routingKey = "MyMessage", exchange = "ipsmse")

我的订阅者尝试阅读这些消息:

class SubscriberImpl @Inject()(actorSystem: ActorSystem, lifecycle: ApplicationLifecycle) extends Subscriber {

  override val rabbitControl = actorSystem.actorOf(Props[RabbitControl])

  import MyMessage._
  implicit val myMessageFormat = Json.format[MyMessage]
  implicit val recoveryStrategy = RecoveryStrategy.none
  val subscriptionRef = Subscription.run(rabbitControl) {
    import Directives._
    channel(qos = 3) {
      consume(Queue.passive("MyMessage")) {
        (body(as[MyMessage]) & routingKey) { (myMessage, key) =>
          println(s"""A message '${myMessage.toString}' was received over '$key'.""")
          ack
        }
      }
    }
  }

  lifecycle.addStopHook { () =>
    Future(subscriptionRef.close())
  }
}

当我运行播放应用程序时,发布似乎工作正常:

rabbitmqadmin --vhost="ipsmsvh" get queue=MyMessage requeue=false -u ... -p ...     

+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
|    routing_key     | exchange | message_count |                                                                       payload                                                                        | payload_bytes | payload_encoding | redelivered |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
| MyMessage | ipsmse   | 8             | {"id":"1bef63f4-2854-41c1-9736-3fe913eec307","createdAt":1484161631674,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}} | 148           | string           | True        |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+    

但是订阅者会产生错误,期待不同的内容类型:

MyMessage - exception while processing message. Body={"id":"70a43b48-51b8-4adc-86d0-6a250c1d89a8","createdAt":1484161760382,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}}. Envelope=Envelope(deliveryTag=1, redeliver=true, exchange=ipsmse, routingKey=MyMessage)
com.spingo.op_rabbit.MismatchedContentType: MismatchedContentType: expected 'application/json', received 'text/plain'
  at com.spingo.op_rabbit.PlayJsonSupport$$anon$2.unmarshall(PlayJsonSupport.scala:53)  

这是我的编组器/解组器:

object Marshaller {

  implicit val myMessageMarshaller =
    new RabbitMarshaller[MyMessage]
        with RabbitUnmarshaller[MyMessage] {
      val contentType = "application/json"
      val contentEncoding = Some("UTF-8")

      override def marshall(value: MyMessage) =
        value.toString.getBytes

      override def unmarshall(
          value: Array[Byte],
          contentType: Option[String],
          charset: Option[String]): MyMessage = {
        val json = Json.parse(new String(value))
        MyMessage.myMessageReads.reads(json).getOrElse(new MyMessage(null, null, null))
      }
    }

}

关于可能导致错误的原因以及如何修复它的任何想法?是否只是设置内容类型的问题,如果是,如何设置?谢谢!

4

0 回答 0