.Net Core&RabbitMQ限制循环消费( 二 )


.Net Core&RabbitMQ限制循环消费

文章插图
此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false 。
var dlxExchangeName = "dlx_exchange";channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);var dlxQueueName = "dlx_queue";channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");var queueName = "nackorreject_queue";var arguments = new Dictionary<string, object>{    { "x-dead-letter-exchange", dlxExchangeName }};channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine("拒收");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//关键在于requeue=false        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);【.Net Core&RabbitMQ限制循环消费】如此一来,拒收消息不会重入队列,并且现有队列绑定了死信交换机,因此,消息进入到死信队列中,如不绑定 , 则消息丢失 。
.Net Core&amp;RabbitMQ限制循环消费

文章插图
限定重试次数设置重试次数,限定循环消费的次数,允许短暂的循环,但最终打破循环 。
消息头设定次数在消息头中设置次数记录作为标记,但是 , 消费端无法对接收到的消息修改消息头然后将原消息送回MQ,因此,需要将原消息内容重新发送消息到MQ,具体步骤如下
  1. 原消息设置不重入队列 。
  2. 再发送新的消息其内容与原消息一致,可设置新消息的消息头来携带重试次数 。
  3. 消费端再次消费时,便可从消息头中查看消息被消费的次数 。
    .Net Core&amp;RabbitMQ限制循环消费

    文章插图
此处假定接收10条消息,在接收到第5条消息时设置拒收 ,  当消息头中重试次数未超过设定的3次时,消息可以重入队列,再次被消费 。
var queueName = "messageheaderretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))    {        var maxRetryCount = 3;        Console.WriteLine($"拒收 {DateTime.Now}");        //初次消费        if (ea.BasicProperties.Headers == null)        {            //原消息设置为不重入队列            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);            //发送新消息到队列中            RetryPublishMessage(channel, queueName, message.ToArray(), 1);            return;        }        //获取重试次数        var retryCount = ParseRetryCount(ea);        if (retryCount < maxRetryCount)        {            //原消息设置为不重入队列            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);            //发送新消息到队列中            RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);            return;        }        //到达最大次数,不再重试消息        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount){    var basicProperties = channel.CreateBasicProperties();    basicProperties.Headers = new Dictionary<string, object>();    basicProperties.Headers.Add("retryCount", retryCount);    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);}static int ParseRetryCount(BasicDeliverEventArgs ea){    var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);    if (!existRetryRecord)    {        throw new Exception("没有设置重试次数");    }    return (int)retryCount;}

推荐阅读