【.NET 6】RabbitMQ延迟消费指南

背景最近遇到一个比较特殊需求,需要修改一个的RabbitMQ消费者,以实现在消费某种特定的类型消息时,延迟1小时再处理 , 几个需要注意的点:

  • 延迟是以小时为单位
  • 不是所有消息都延迟消费,只延迟特定类型的消息
  • 只在第一次消费时延迟1小时,容错机制产生的重新消费(也即消息消费失败,多次进入延迟队列重试),则不再延迟1小时
  • 消费者消费过程中可能会重启
考虑到这几点,我们需要一个标识以及持久化,不能简单使用Thread.Sleep或者Task.Delay;下面开始演示在不引入其它框架资源的前提下,利用现有的RabbitMQ来实现这个需求 。
准备如果没有可用的RabbitMQ测试环境,推荐使用docker本地搭建
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management项目搭建创建解决方案RabbitMQDemo,并添加一个.Net6控制台程序Producer作为生产者,
mkdir RabbitMQDemocd RabbitMQDemodotnet new sln -n RabbitMQDemomkdir srccd srcdotnet new console -n Producercd Producerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/Producer/Producer.csproj我们给Producer项目添加了两个包 ——EasyNetQ是用来简便RabbitMQ操作,添加Newtonsoft.Json则是因为EasyNetQ从v7版本开始移除了对前者的依赖,需要使用者自行添加 。
接下来定义消息的数据结构,添加一个类库Core到解决方案,
cd srcdotnet new classlib --name Corecd ..dotnet sln add ./src/Core/Core.csproj添加如下OrderNotification类 , 后面我们根据消息的 Type的值来确定是正常消费还是延迟消费 。
namespace Core{public class OrderNotification{public string OrderId { get; set; }public int Type { get; set; }public DateTime DateCreation { get; set; }}}生产者在Producer项目里 , 声明队列orders.notification,绑定到同名交换机 , 然后向该交换机发送OrderNotification类型的数据,
实际项目中,我们很少直接发消息到队列,都是发送到交换机,这个项目虽然只是demo,但也遵循这个原则
完整代码如下:
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");//声明交换机var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);//声明队列var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");//绑定await bus.Advanced.BindAsync(sourceExchange, sourceQueue, "");Console.WriteLine("按Ctrl + C 暂停发送,任意键恢复发送");Console.TreatControlCAsInput = true;while (true){Random random = new();var orderId = Guid.NewGuid().ToString();var type = random.Next(1, 3);await bus.Advanced.PublishAsync(sourceExchange, "", true, new Message<OrderNotification>(new OrderNotification { OrderId = orderId, Type = type, DateCreation = DateTime.Now }));Console.WriteLine($"{DateTime.Now}:消息(OrderId:{orderId},Type:{type}) 已发送");Thread.Sleep(1000);}运行Producer项目,可以看到消息正在不停的发送
【.NET 6】RabbitMQ延迟消费指南

文章插图
打开RabbitMQ后台,名orders.notification的队列和交换机已经创建好且相互绑定,队列里已经有我们刚刚发送的消息
【.NET 6】RabbitMQ延迟消费指南

文章插图
下面我们要做的就是将队列orders.notificationType为1的消息延迟消费,其它则正常消费 。
延迟消费使用死信交换机实现原理就是在声明一个队列时 , 给它配置死信交换机(Dead Letter Exchanges,简称DLX)策略,对应参数为x-dead-letter-exchange,这种队列处理带设置了过期时间属性(Properties.expiration)的消息时,在消息到期时,会自动将消息投递到事先配置好的死信交换机上 。
我们解决方案增加一个控制台类型的消费者项目DLXConsumer
cd srcdotnet new console -n DLXConsumercd DLXConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DLXConsumer/DLXConsumer.csproj和生产者类似,实现消费者我们也创建一对同名的交换机和队列

推荐阅读