我有一个简单的兔子设置,目前正在做我想做的事......
它根据消息的类型发布消息。每种类型都有自己的队列。
当消息发布时,即使没有消费者消费它们,它们也会坐在队列中(如果没有消费者到达,则永远坐在那里)。
当消费者在那里(只有一个!)时,它会吃掉消息。
如果由于某种原因它无法处理消息(例如,它在父消息到达之前获得子消息)它会将nack
消息返回到队列中。
如果它看到相同的消息六次,它nack
就是该消息。
这一切都有效,但目前在六次尝试后它会丢弃消息。
我想要的是将消息传递到“死信队列”,并在一段时间后(比如 5 分钟)将该消息重新排队到它来自的特定队列的末尾。
我绝对是货物崇拜程序,我不太了解所有的交换/队列/绑定/路由键和其他涉及的奥秘......感谢手持!
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
null);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.TxSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{
{ "Id", Guid.NewGuid().ToString() }
};
channel.BasicPublish("",
$"App_{typeof(T).Name}",
properties,
body);
Data.MarkAsSent(message);
channel.TxCommit();
}
}
}
ISendable
只需确保消息具有一些属性,用于在Data.MarkAsSent(message);
我们拥有的数据库中进行标记。
接收器有一个类似的代码块来处理每种类型。正如我所说,这是有效的。
我需要做什么来添加死信队列的东西?
我这样的尝试创建了死信队列,但没有任何东西移动到它们。
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
channel.ExchangeDeclare("App.Dead.Letter", "direct", true);
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "App.Dead.Letter" },
{
"x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
}
};
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
args);
channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
true,
false,
false,
null);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.TxSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{
{ "Id", Guid.NewGuid().ToString() }
};
channel.BasicPublish("",
$"App_{typeof(T).Name}",
properties,
body);
Data.MarkAsSent(message);
channel.TxCommit();
}
}
}
在我的接收器中,我有这个魔力
catch (Exception ex)
{
var attemptsToHandle = MarkFailedToHandleMessage(logId, ex);
if (attemptsToHandle > 5)
{
//If we have seen this message many times then don't re-que.
channel.BasicNack(ea.DeliveryTag, false, false);
return;
}
// re-que so we can re-try later.
channel.BasicNack(ea.DeliveryTag, false, true);
return;
}
呼...很多代码。谢谢,如果你已经做到了这一步......
我在问我的代码中有哪些明显的问题使事情落入死信队列。
以及我需要添加什么额外内容,以便 dlq 中的内容在一段时间后反弹回主队列。此外,这为每种类型的队列设置了一个 dlq……这是必需的还是应该有一个队列来保存错误消息?