背景最近遇到一个比较特殊需求,需要修改一个的RabbitMQ消费者,以实现在消费某种特定的类型消息时,延迟1小时再处理 , 几个需要注意的点:
- 延迟是以小时为单位
- 不是所有消息都延迟消费,只延迟特定类型的消息
- 只在第一次消费时延迟1小时,容错机制产生的重新消费(也即消息消费失败,多次进入延迟队列重试),则不再延迟1小时
- 消费者消费过程中可能会重启
Thread.Sleep
或者Task.Delay
;下面开始演示在不引入其它框架资源的前提下,利用现有的RabbitMQ来实现这个需求 。准备如果没有可用的RabbitMQ测试环境,推荐使用docker本地搭建
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
项目搭建创建解决方案RabbitMQDemo
,并添加一个.Net6控制台程序Producer
作为生产者,mkdir RabbitMQDemocd RabbitMQDemodotnet new sln -n RabbitMQDemomkdir srccd srcdotnet new console -n Producercd Producerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/Producer/Producer.csproj
我们给Producer
项目添加了两个包 ——EasyNetQ
是用来简便RabbitMQ
操作,添加Newtonsoft.Json
则是因为EasyNetQ
从v7版本开始移除了对前者的依赖,需要使用者自行添加 。接下来定义消息的数据结构,添加一个类库
Core
到解决方案,cd srcdotnet new classlib --name Corecd ..dotnet sln add ./src/Core/Core.csproj
添加如下OrderNotification
类 , 后面我们根据消息的 Type
的值来确定是正常消费还是延迟消费 。namespace Core{public class OrderNotification{public string OrderId { get; set; }public int Type { get; set; }public DateTime DateCreation { get; set; }}}
生产者在Producer
项目里 , 声明队列orders.notification
,绑定到同名交换机 , 然后向该交换机发送OrderNotification
类型的数据,实际项目中,我们很少直接发消息到队列,都是发送到交换机,这个项目虽然只是demo,但也遵循这个原则完整代码如下:
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");//声明交换机var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);//声明队列var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");//绑定await bus.Advanced.BindAsync(sourceExchange, sourceQueue, "");Console.WriteLine("按Ctrl + C 暂停发送,任意键恢复发送");Console.TreatControlCAsInput = true;while (true){Random random = new();var orderId = Guid.NewGuid().ToString();var type = random.Next(1, 3);await bus.Advanced.PublishAsync(sourceExchange, "", true, new Message<OrderNotification>(new OrderNotification { OrderId = orderId, Type = type, DateCreation = DateTime.Now }));Console.WriteLine($"{DateTime.Now}:消息(OrderId:{orderId},Type:{type}) 已发送");Thread.Sleep(1000);}
运行Producer
项目,可以看到消息正在不停的发送文章插图
打开RabbitMQ后台,名
orders.notification
的队列和交换机已经创建好且相互绑定,队列里已经有我们刚刚发送的消息文章插图
下面我们要做的就是将队列
orders.notification
里Type
为1的消息延迟消费,其它则正常消费 。延迟消费使用死信交换机实现原理就是在声明一个队列时 , 给它配置死信交换机(Dead Letter Exchanges,简称DLX)策略,对应参数为
x-dead-letter-exchange
,这种队列处理带设置了过期时间属性(Properties.expiration
)的消息时,在消息到期时,会自动将消息投递到事先配置好的死信交换机上 。我们解决方案增加一个控制台类型的消费者项目
DLXConsumer
cd srcdotnet new console -n DLXConsumercd DLXConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DLXConsumer/DLXConsumer.csproj
和生产者类似,实现消费者我们也创建一对同名的交换机和队列
推荐阅读
- 王者荣耀9月26日微信每日一题答案是什么
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- 英雄联盟战利品钥匙获取方法是什么
- 进击的巨人最终季大结局_进击的巨人最终季的结局
- 苹果12有几款机型_苹果12有几个型号
- 华容道“过五关”怎么玩(华容道在线免费玩)
- 十一 【Kubernetes】K8s笔记:Ingress 集群进出流量总管
- 弹壳特攻队推图选择哪些技能
- 苹果13promax详细参数_参数配置表
- 如何调整屏幕分辨率(分辨率1920x1080怎么设置)