0

我正在尝试RabbitMQ通过Akka.streams.Ampq源阅读传入的消息,但RoutingKey不正确。

另一个令人担忧的问题是信封不包含交换名称。

//code coming back with incorrect key
public void Consume(IActorRef consumerActor)
{
    //Source =RabbitMQ
    //Sink  =   Our App

    var queueDeclaration = QueueDeclaration.Create(QueueName)
        .WithDurable(true)
        .WithAutoDelete(false);

    var amqpSource = AmqpSource.AtMostOnceSource(
        NamedQueueSourceSettings.Create(ConnectionSettings, QueueName).WithDeclarations(queueDeclaration),
        bufferSize: 10);

    var sink = Sink.ActorRef<SubMessage>(consumerActor, "complete");

    var result =
       amqpSource.Select(b => new SubMessage(b.Bytes.ToString(Encoding.UTF8), ConvertProperties(b.Properties), b.Envelope.RoutingKey))
        .TakeWhile(x => { return true; }, true)
            .RunWith(sink, Materializer);
}

//publish Method
public void Publish(dynamic message, string exchangeName)
{
    var typeOf = message.GetType().Name;
    var jsonMessage = Newtonsoft.Json.JsonConvert.SerializeObject(message);

    //Source = Our App
    //Sink  =  RabbitMQ

    //Connections
    AmqpConnectionDetails.Create(_ampqSettings.Host, _ampqSettings.Port)
        .WithCredentials(AmqpCredentials.Create(_ampqSettings.UserName, _ampqSettings.Password))
        .WithAutomaticRecoveryEnabled(true)
        .WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));

    var queueDeclaration = QueueDeclaration.Create(QueueName)
                                            .WithDurable(true)
                                            .WithAutoDelete(false);

    var exchangeDeclaration = ExchangeDeclaration.Create(exchangeName, "direct").WithDurable(true);

    var bindingDeclaration = BindingDeclaration.Create(QueueName, exchangeName).WithRoutingKey(typeOf);
    //create sink
    var amqpSink = AmqpSink.CreateSimple(
        AmqpSinkSettings.Create(ConnectionSettings)
            .WithRoutingKey(QueueName)
            .WithDeclarations(exchangeDeclaration, queueDeclaration, bindingDeclaration));

    //run sink
    Source.From(new string[] { jsonMessage }).Select(ByteString.FromString).RunWith(amqpSink, Materializer).Wait();
}

//Extra Info below
//**********Class constructor 
private readonly IAMQPSettings _ampqSettings;
private readonly Akka.Actor.ActorSystem System;
private readonly AmqpConnectionDetails ConnectionSettings;
private readonly ActorMaterializer Materializer;
private readonly string QueueName;
public EventBus(Akka.Actor.ActorSystem system, IAMQPSettings ampqSettings)
{
    _ampqSettings = ampqSettings;
    System = system;
    ConnectionSettings = AmqpConnectionDetails.Create(ampqSettings.Host, ampqSettings.Port)
                                               .WithCredentials(AmqpCredentials.Create(ampqSettings.UserName, ampqSettings.Password))
                                               .WithAutomaticRecoveryEnabled(true)
                                               .WithNetworkRecoveryInterval(TimeSpan.FromSeconds(1));

    Materializer = ActorMaterializer.Create(System);
    QueueName = ampqSettings.QueueName;
}

//SubMessage structure
public class SubMessage
{
    public SubMessage(string message, Dictionary<string, object> properties = null, string routingKey = null, string exchangeName = null)
    {
        ExchangeName = exchangeName;
        Message = message;
        Properties = properties;
        RoutingKey = routingKey;
    }

    public string Message { get; private set; }
    public Dictionary<string, object> Properties { get; private set; }
    public string RoutingKey { get; private set; }
    public string ExchangeName { get; private set; }
}

实际行为:我得到了 预期的行为:我期望得到QueueName 哪个是我的班级,例如。"Tax_Queue"QueueNameMyTestClass RoutingKey

4

1 回答 1

0

我发现了问题:代码说 .WithRoutingKey(QueueName) 而不是 WithRoutingKey(routingKey)

于 2019-08-01T11:07:15.077 回答