3.4.2.2消费消息手动确认配置类
- 配置消费者的数量 setConcurrentConsumers(2);
- 最大并发消费者数量 setMaxConcurrentConsumers(5);
- 消费消息确认机制为手动 setAcknowledgeMode(AcknowledgeMode.MANUAL);
- 设置监听消息队列的名称,支持多个队列setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);
- 设置消息手动确认监听器 setMessageListener(rabbitMQConsumerManualAckListener);
/** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: RabbitMQ消费消息手动确认配置类 */@Configurationpublic class RabbitMQConsumeManualAckConfig {@Autowiredprivate RabbitMQConsumerManualAckListener rabbitMQConsumerManualAckListener;/*** @author : huayu* @date: 2/11/2022* @param: [connectionFactory]* @return : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer* @description : 自定义消息监听器工程对象*/@Beanpublic SimpleMessageListenerContainer simpleBrokerMessageHandler(ConnectionFactory connectionFactory){//初始化消息监听容器的工程对象SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);//初始化并发消费者的数量,比如是2 , 代表同时会有 两个消费者 消费消息// ,投递标识可能会相同container.setConcurrentConsumers(2);//设置最大的并发消费者数量,数量不能低于初始化并发消费者数量//可以动态的设定当前容器的消费者数量,可以实现动态增加和减少消费者的算法在 SimpleMessageListenerContainer类中实现container.setMaxConcurrentConsumers(5);//底层动态实现消费者数量的增加减少原理// 有consumer已连续十个周期(consecutiveActiveTrigger)处于活动状态,并且自启动后最后一个consumer运行至少经过了10秒钟 , 则将启动新的consumer 。// private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;// 停止消费者算法的时间间隔// 有consumer已连续10个周期(consecutiveIdleTrigger)连续空闲状态 , 并且上一个consumer至少在60秒之前停止,那么该consumer将停止// private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;// 默认连续活动10个周期// private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;// 默认连续空闲10个周期// private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;//默认的消费消息确认机制是自动,需要改为手动container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置监听消息队列的名称 , 支持多个队列(队列名1 , 队列名2...),注意前提是指定的队列必须是存在的//监听 直连模式的 RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96 队列container.setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);//监听 扇形模式的//RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE 队列//和 RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO 队列//container.setQueueNames(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE//,RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO);//指定消息确认的处理类,会同时产生多个消费者,参数是上面设置的,//注意之前使用直连模式,消息消费者,要注释掉,防止同类型的监听器 , 处理同一队列//如果不是被当前消息确认的处理类消费(使用注解@RabbitListener),会导致消息不执行手动处理container.setMessageListener(rabbitMQConsumerManualAckListener);// 返回消息监听容器工厂对象return container;}}
3.4.2.3 请求方法//======================/*** @author : huayu* @date: 3/11/2022* @param: [ackMsg]* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>* @description : 测试 消费者手动确认*/@GetMapping("/consumeAckManual")public RequestResult<String> testRabbitMQConsumeAckManual(@RequestParam String ackMsg){log.info("------- 测试Ack 手动 确认,发送消息 -------");//消息手动确认//模拟发送直连消息//测试1,2rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96,JSON.toJSONString(ackMsg));return ResultBuildUtil.success("使用直连模式 手动消费确认-消息确认成功");//测试3//模拟发送扇形消息//rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(//RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96//,null//,JSON.toJSONString(ackMsg));//////return ResultBuildUtil.success("使用扇形模式 手动消费确认-消息确认成功");}
3.4.2.4 请求测试3.4.2.4.1 模拟发送直连消息并成功确认发送请求:文章插图
请求结果:
文章插图
3.4.2.4.2 模拟发送直连消息,抛出异常,重回队列发送请求:
推荐阅读
- 剑与远征8月诗社竞答第六天答案是什么
- 十六 企业级自定义表单引擎解决方案--Excel导入导出
- Oracle中查询表结构的六种方法
- 六 Selenium4+Python3系列 - Selenium的三种等待,强制等待、隐式等待、显式等待
- 某 .NET RabbitMQ SDK 有采集行为,你怎么看?
- 梦幻西游手游妙法试炼第六章怎么通关
- 之六 2流高手速成记:从SpringBoot到SpringCloudAlibaba
- 伍六七第三季更新时间_伍六七第三季什么时候更新
- 鸡皮疙瘩第六关恐惧怎么过
- .Net Core&RabbitMQ限制循环消费