六 SpringCloud - RabbitMQ安装,三种消息发送模式,消息发送确认,消息消费确认(自动,手动)( 七 )

3.5.1.3 请求方法/*** @author : huayu* @date: 3/11/2022* @param: [ackMsg]* @return : com.kgc.scd.uitl.RequestResult<java.lang.String>* @description : 测试 消费者自动 重试*/@GetMapping("/consumeAckAuto")public RequestResult<String> testRabbitMQConsumeAckAuto(@RequestParam String ackMsg){log.info("------- 直连 模式 测试Ack 自动 重试,发送消息 -------");//模拟发送直连消息//消费消息失败重试机制rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96,JSON.toJSONString(ackMsg));return ResultBuildUtil.success("使用直连模式 消费确认-自动消费成功");}3.5.1.4 请求测试发起请求:

六 SpringCloud - RabbitMQ安装,三种消息发送模式,消息发送确认,消息消费确认(自动,手动)

文章插图
请求结果:
一共重试了五次
间隔时间为1,2,4,8
(如果还有一次应该为10,因为最后一次计算时间16大于最大间隔时间10,按最大间隔时间10重试);
六 SpringCloud - RabbitMQ安装,三种消息发送模式,消息发送确认,消息消费确认(自动,手动)

文章插图
3.4.2 手动确认注意:
  • 手动确认需要先将自动确认的配置注释掉;
  • 使用手动确认,不能再用@RabbitListener 监听,手动确认相关队列 , 需要我们手动配置消费者;
3.4.2.1 消费消息手动确认的监听器
  • 获取消息消费的唯一标识 message.getMessageProperties().getDeliveryTag();
  • 执行业务处理
    • 每个消费者在同一个时间点 , 最多处理一个message,默认是0(全部) channel.basicQos(1);
    • 获取message的消息内容 message
    • 获取消息对应的目标队列,可以实现一些灵活判断处理message.getMessageProperties().getConsumerQueue()
      • 比如根据不同的目标队列进行不同的处理
      • 在消息处理的时候如果出错会被捕获(消息确认失败)
    • 消息确认channel.basicAck(deliveryTag,false);
  • 消息确认失败处理
    • 根据条件判断设置是否重回队列 ,是否支持批量处理channel.basicNack(deliveryTag,true,false);
【六 SpringCloud - RabbitMQ安装,三种消息发送模式,消息发送确认,消息消费确认(自动,手动)】/** * Created On : 2/11/2022. * <p> * Author : huayu * <p> * Description: 消费端 消费消息手动确认的监听器,注意它也是一个消费者,并可以通过 消息监听容器工厂,动态配置多个 */@Slf4j@Componentpublic class RabbitMQConsumerManualAckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws IOException {//获取消息消费的唯一标识 , rabbitMQ在推送消息时 , 会给每个消息携带一个唯一标识,值是一个递增的正整数long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("====== 消费消息的唯一标识:{}======",deliveryTag);//执行手动确认业务处理try{//给每个消费者在同一个时间点,最多处理一个message,默认是0(全部),换句话说,在接收到消费者的 ack 确认前,不会分发新的消息给当前的消费者//在接收当前消息的ack确认前是不会发送新的消息给它channel.basicQos(1);//获取message的消息内容,发送的消息的json字符串log.info("====== 消息队列中完整消息内容:{} ======",message);//获取发送的实际内容,发送消息的json字符串log.info("====== 发送的实际内容:{} ======",new String(message.getBody(),"utf-8"));//获取消息对应的目标队列,可以实现一些灵活判断//TODO 比如根据目标队列不同 , 可以做不同的处理log.info("======消息的来源队列:{} =======",message.getMessageProperties().getConsumerQueue());//模拟错误 ,当 deliveryTag 为1的时候,进入 报错  , 捕获异常,然后(如果设置了重回队列)将消息重回队列//if(deliveryTag == 1){//int num = 1/0;//}//消费消息的手动确认,消息确认成功-basicAck//第一个参数deliveryTag,消息的唯一标识//第二个参数multiple , 消息是否支持批量确认,如果是true,代表可以一次性确认标识小于等于当前标识的所有消息//如果是false,只会确认当前消息channel.basicAck(deliveryTag,false);}catch (Exception e){//说明消费消息处理失败,如果不进行确认(自动确认 , 投递成功即确认,消费是否正常 , 不关心),消息就会丢失//消息处理失败确认,代表消息没有正确消费 , 注意:此种方式一次只能确认一个消息//第一给参数是消息的唯一标识,//第二个参数是代表是否重回队列,如果是true,重新将该消息放入队列,再次消费//注意:第二个参数要谨慎,必须要结合具体业务场景,根据业务判断是否需要重回队列,一旦处理不当,机会导致消息循环入队,消息挤压//不重回队列 require = false//channel.basicReject(deliveryTag,false);//重回队列 require = truechannel.basicReject(deliveryTag,true);//消息处理失败确认,代表消息没有正确消费,注意 , 此种方式支持批量//第一个参数是消息的唯一标识 , //第二个参数是代表是否支持批量确认//第三给参数代表是否重回队列//不重回队列 require = false//channel.basicNack(deliveryTag,true,false);//重回队列 require = true//channel.basicNack(deliveryTag,false,true);//TODO 手动消费异常处理log.error("====== 消费消息失败,异常信息:{}======",e.getMessage());}}}

推荐阅读