文章插图
此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false 。
var dlxExchangeName = "dlx_exchange";channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);var dlxQueueName = "dlx_queue";channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");var queueName = "nackorreject_queue";var arguments = new Dictionary<string, object>{ { "x-dead-letter-exchange", dlxExchangeName }};channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ var message = ea.Body; Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray())); if (Encoding.UTF8.GetString(message.ToArray()).Contains("50")) { Console.WriteLine("拒收"); ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//关键在于requeue=false return; } ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
【.Net Core&RabbitMQ限制循环消费】如此一来,拒收消息不会重入队列,并且现有队列绑定了死信交换机,因此,消息进入到死信队列中,如不绑定 , 则消息丢失 。文章插图
限定重试次数设置重试次数,限定循环消费的次数,允许短暂的循环,但最终打破循环 。
消息头设定次数在消息头中设置次数记录作为标记,但是 , 消费端无法对接收到的消息修改消息头然后将原消息送回MQ,因此,需要将原消息内容重新发送消息到MQ,具体步骤如下
- 原消息设置不重入队列 。
- 再发送新的消息其内容与原消息一致,可设置新消息的消息头来携带重试次数 。
- 消费端再次消费时,便可从消息头中查看消息被消费的次数 。
文章插图
var queueName = "messageheaderretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ var message = ea.Body; Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray())); if (Encoding.UTF8.GetString(message.ToArray()).Contains("5")) { var maxRetryCount = 3; Console.WriteLine($"拒收 {DateTime.Now}"); //初次消费 if (ea.BasicProperties.Headers == null) { //原消息设置为不重入队列 ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false); //发送新消息到队列中 RetryPublishMessage(channel, queueName, message.ToArray(), 1); return; } //获取重试次数 var retryCount = ParseRetryCount(ea); if (retryCount < maxRetryCount) { //原消息设置为不重入队列 ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false); //发送新消息到队列中 RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1); return; } //到达最大次数,不再重试消息 ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false); return; } ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount){ var basicProperties = channel.CreateBasicProperties(); basicProperties.Headers = new Dictionary<string, object>(); basicProperties.Headers.Add("retryCount", retryCount); channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);}static int ParseRetryCount(BasicDeliverEventArgs ea){ var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount); if (!existRetryRecord) { throw new Exception("没有设置重试次数"); } return (int)retryCount;}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- IQueryable和IEnumerable 快读《ASP.NET Core技术内幕与项目实战》EFCore2.5:集合查询原理揭秘
- 「MySQL高级篇」MySQL锁机制 && 事务
- <五>掌握左值引用和初识右值引用
- .NET6打包部署到Windows Service
- Linux软件安装方式 - Tarball&RPM&YUM
- <四>1:全面掌握Const的用法
- Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式
- 是什么让.NET7的Min和Max方法性能暴增了45倍?
- 前端性能优化——首屏时间&&白屏时间
- 19 基于.NetCore开发博客项目 StarBlog - Markdown渲染方案探索