0

使用主题交换,我想要一个具有以下功能的发布/订阅消息模式:

  1. 实施了“出版商确认”。
  2. 消费者在处理完每条消息后也对其进行确认。
  3. 使用路由键将消息路由到一个或多个消费者。
  4. 拥有持久的消费者队列,因此如果消费者应用程序暂时关闭,它可以在恢复时从其队列中获取消息。

所以我创建了 2 个控制台应用程序(发送和接收)来测试上述内容。

发送

    static void Main(string[] args)
    {

        Console.WriteLine(" Type [exit] to exit.");

        Publisher publisher = new Publisher();

        do
        {
            var userInput = Console.ReadLine();
            if (userInput == "exit")
            {
                break;
            }


            publisher.SendMessageToBroker("localhost", "main", "user.update", userInput);

        } while (true);
    }

出版商

public class Publisher
{
    const string ExchangeType = "topic";

    Dictionary<ulong, string> unConfirmedMessageTags = new Dictionary<ulong, string>();

    public void SendMessageToBroker(string host, string exchangeName, string routingKey, string message)
    {

        var factory = new ConnectionFactory() { HostName = host };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {                
            channel.BasicAcks += (sender, ea) => OnBasicAcks(ea.Multiple, ea.DeliveryTag);
            channel.BasicNacks += (sender, ea) => OnBasicNacks(ea.Multiple, ea.DeliveryTag);

            channel.ConfirmSelect();

            channel.ExchangeDeclare(exchangeName, ExchangeType);

            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            unConfirmedMessageTags.TryAdd(channel.NextPublishSeqNo, message);

            channel.BasicPublish(exchange: exchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

    private void OnBasicNacks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            Console.WriteLine("Messages with delivery tag LESS THAN {0} have been LOST and must be resent.", deliveryTag);
        }
        else
        {
            Console.WriteLine("Message with delivery tag {0} has been LOST and must be resent.", deliveryTag);
        }
    }

    private void OnBasicAcks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            var confirmed = unConfirmedMessageTags.Where(k => k.Key <= deliveryTag);
            foreach (var entry in confirmed)
            {
                unConfirmedMessageTags.Remove(entry.Key);
                Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", entry.Key);
            }

        }
        else
        {
            unConfirmedMessageTags.Remove(deliveryTag);
            Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", deliveryTag);
        }

    }
}

}

收到

    static void Main(string[] args)
    {
        const string ExchangeName = "main";
        const string QueueName = "q1";
        const string ExchangeType = "topic";
        const string RoutingKey = "user.update";

        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(ExchangeName, ExchangeType);

            channel.QueueDeclare(queue: QueueName, 
                durable: true, 
                autoDelete: false, 
                exclusive: false, 
                arguments: null);

            channel.QueueBind(QueueName, ExchangeName, RoutingKey);

            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) => Basic_Ack(channel, ea.DeliveryTag, ea.Body);

            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }            
    }

    private static void Basic_Ack(IModel channel, ulong deliveryTag, ReadOnlyMemory<byte> body)
    {            
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine(" [x] Received {0}", message);

        Thread.Sleep(2000);            

        channel.BasicAck(deliveryTag: deliveryTag, multiple: false);

        Console.WriteLine(" [x] Processed {0}", message);
    }
}

问题是我的Send程序中的OnBasicAcks只为第一条消息调用一次。

在此处输入图像描述

4

1 回答 1

0

对于可能遇到此问题的其他任何人,我正在为每个不鼓励的发布打开一个连接和通道(虚拟连接) :

“连接意味着长期存在。为每个操作(例如发布消息)打开连接将非常低效并且非常不鼓励。”

另见这里

“使用 ConfirmSelect 方法在通道级别启用发布者确认...必须在您希望使用发布者确认的每个通道上调用此方法。确认应该只启用一次,而不是为每条发布的消息启用。”

切换到使用长期连接为我解决了这个问题。

于 2020-06-01T11:36:23.793 回答