我正在尝试RabbitMQ
通过Akka.streams.Ampq
源阅读传入的消息,但RoutingKey
不正确。
另一个令人担忧的问题是信封不包含交换名称。
//code coming back with incorrect key
public void Consume(IActorRef consumerActor)
{
//Source =RabbitMQ
//Sink = Our App
var queueDeclaration = QueueDeclaration.Create(QueueName)
.WithDurable(true)
.WithAutoDelete(false);
var amqpSource = AmqpSource.AtMostOnceSource(
NamedQueueSourceSettings.Create(ConnectionSettings, QueueName).WithDeclarations(queueDeclaration),
bufferSize: 10);
var sink = Sink.ActorRef<SubMessage>(consumerActor, "complete");
var result =
amqpSource.Select(b => new SubMessage(b.Bytes.ToString(Encoding.UTF8), ConvertProperties(b.Properties), b.Envelope.RoutingKey))
.TakeWhile(x => { return true; }, true)
.RunWith(sink, Materializer);
}
//publish Method
public void Publish(dynamic message, string exchangeName)
{
var typeOf = message.GetType().Name;
var jsonMessage = Newtonsoft.Json.JsonConvert.SerializeObject(message);
//Source = Our App
//Sink = RabbitMQ
//Connections
AmqpConnectionDetails.Create(_ampqSettings.Host, _ampqSettings.Port)
.WithCredentials(AmqpCredentials.Create(_ampqSettings.UserName, _ampqSettings.Password))
.WithAutomaticRecoveryEnabled(true)
.WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));
var queueDeclaration = QueueDeclaration.Create(QueueName)
.WithDurable(true)
.WithAutoDelete(false);
var exchangeDeclaration = ExchangeDeclaration.Create(exchangeName, "direct").WithDurable(true);
var bindingDeclaration = BindingDeclaration.Create(QueueName, exchangeName).WithRoutingKey(typeOf);
//create sink
var amqpSink = AmqpSink.CreateSimple(
AmqpSinkSettings.Create(ConnectionSettings)
.WithRoutingKey(QueueName)
.WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration));
//run sink
Source.From(new string[] { jsonMessage }).Select(ByteString.FromString).RunWith(amqpSink, Materializer).Wait();
}
//Extra Info below
//**********Class constructor
private readonly IAMQPSettings _ampqSettings;
private readonly Akka.Actor.ActorSystem System;
private readonly AmqpConnectionDetails ConnectionSettings;
private readonly ActorMaterializer Materializer;
private readonly string QueueName;
public EventBus(Akka.Actor.ActorSystem system, IAMQPSettings ampqSettings)
{
_ampqSettings = ampqSettings;
System = system;
ConnectionSettings = AmqpConnectionDetails.Create(ampqSettings.Host, ampqSettings.Port)
.WithCredentials(AmqpCredentials.Create(ampqSettings.UserName, ampqSettings.Password))
.WithAutomaticRecoveryEnabled(true)
.WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));
Materializer = ActorMaterializer.Create(System);
QueueName = ampqSettings.QueueName;
}
//SubMessage structure
public class SubMessage
{
public SubMessage(string message, Dictionary<string, object> properties = null, string routingKey = null, string exchangeName = null)
{
ExchangeName = exchangeName;
Message = message;
Properties = properties;
RoutingKey = routingKey;
}
public string Message { get; private set; }
public Dictionary<string, object> Properties { get; private set; }
public string RoutingKey { get; private set; }
public string ExchangeName { get; private set; }
}
实际行为:我得到了
预期的行为:我期望得到QueueName
哪个是我的班级,例如。"Tax_Queue"
QueueName
MyTestClass
RoutingKey