我已经在我的 RabbitMQ 服务器上建立了一个direct
交换。我将消息发布到该服务器,如下所示:
val message = Json.toJson(myMessage).toString
subscriber.rabbitControl ! Message.exchange(message, routingKey = "MyMessage", exchange = "ipsmse")
我的订阅者尝试阅读这些消息:
class SubscriberImpl @Inject()(actorSystem: ActorSystem, lifecycle: ApplicationLifecycle) extends Subscriber {
override val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
import MyMessage._
implicit val myMessageFormat = Json.format[MyMessage]
implicit val recoveryStrategy = RecoveryStrategy.none
val subscriptionRef = Subscription.run(rabbitControl) {
import Directives._
channel(qos = 3) {
consume(Queue.passive("MyMessage")) {
(body(as[MyMessage]) & routingKey) { (myMessage, key) =>
println(s"""A message '${myMessage.toString}' was received over '$key'.""")
ack
}
}
}
}
lifecycle.addStopHook { () =>
Future(subscriptionRef.close())
}
}
当我运行播放应用程序时,发布似乎工作正常:
rabbitmqadmin --vhost="ipsmsvh" get queue=MyMessage requeue=false -u ... -p ...
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
| MyMessage | ipsmse | 8 | {"id":"1bef63f4-2854-41c1-9736-3fe913eec307","createdAt":1484161631674,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}} | 148 | string | True |
+--------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+-------------+
但是订阅者会产生错误,期待不同的内容类型:
MyMessage - exception while processing message. Body={"id":"70a43b48-51b8-4adc-86d0-6a250c1d89a8","createdAt":1484161760382,"sMSMessage":{"from":"from1","to":"to1","text":"text1","subject":"subject1"}}. Envelope=Envelope(deliveryTag=1, redeliver=true, exchange=ipsmse, routingKey=MyMessage)
com.spingo.op_rabbit.MismatchedContentType: MismatchedContentType: expected 'application/json', received 'text/plain'
at com.spingo.op_rabbit.PlayJsonSupport$$anon$2.unmarshall(PlayJsonSupport.scala:53)
这是我的编组器/解组器:
object Marshaller {
implicit val myMessageMarshaller =
new RabbitMarshaller[MyMessage]
with RabbitUnmarshaller[MyMessage] {
val contentType = "application/json"
val contentEncoding = Some("UTF-8")
override def marshall(value: MyMessage) =
value.toString.getBytes
override def unmarshall(
value: Array[Byte],
contentType: Option[String],
charset: Option[String]): MyMessage = {
val json = Json.parse(new String(value))
MyMessage.myMessageReads.reads(json).getOrElse(new MyMessage(null, null, null))
}
}
}
关于可能导致错误的原因以及如何修复它的任何想法?是否只是设置内容类型的问题,如果是,如何设置?谢谢!