.Net Core&RabbitMQ限制循环消费( 三 )

消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中 , 如此来控制消费次数 , 但是这种场景下 , 新消息排在了队列的尾部,而不是原消息排在队列头部 。

.Net Core&RabbitMQ限制循环消费

文章插图
存储重试次数在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在 。
.Net Core&RabbitMQ限制循环消费

文章插图
与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了 。需要注意的是,消息发送端需要设置消息的唯一标识(MessageId属性)
//模拟外部存储服务var MessageRetryCounts = new Dictionary<ulong, int>();var queueName = "storageretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));if (Encoding.UTF8.GetString(message.ToArray()).Contains("50")){    var maxRetryCount = 3;    Console.WriteLine("拒收");    //重试次数判断    var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);    if (!existRetryRecord)    {        //重入队列,继续重试        MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)    {        //重入队列,继续重试        MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    //到达最大次数 , 不再重试消息    ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);    return;}    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);除第一次拒收外 , 允许三次重试机会,三次重试完毕后 , 设置requeue为false,消息丢失或进入死信队列(如有设置的话) 。
.Net Core&amp;RabbitMQ限制循环消费

文章插图
队列使用Quorum类型第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用Quorum , 由MQ来限定消息的投递次数,也就控制了重试次数 。
.Net Core&amp;RabbitMQ限制循环消费

文章插图
设置队列类型为quorum , 设置投递最大次数,当超过投递次数后,消息被丢弃 。
var queueName = "quorumtype_queue";var arguments = new Dictionary<string, object>(){    { "x-queue-type", "quorum"},    { "x-delivery-limit", 3 }};channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine($"拒收 {DateTime.Now}");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);第一次消费被拒收重入队列后,经最大三次投递后,消费端不再收到消息 , 如此一来也限制了消息的循环消费 。
.Net Core&amp;RabbitMQ限制循环消费

文章插图
队列消息过期当为消息设置了过期时间时 , 当消息没有受到Ack,且还在队列中,受到过期时间的限制,反复消费但未能成功时,消息将走向过期,进入死信队列或是被丢弃 。
聚焦于过期时间的限制,因此在消费者端,因异常或是拒收消息时,需要对requeue设置为true,将消息再次重入到原队列中 。

推荐阅读