MQ系列6:消息的消费( 三 )


  • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,非第一次启动接着上次消费的进度继续消费
  • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,非第一次启动接着上次消费的进度继续消费
  • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,非第一次启动接着上次消费的进度继续消费以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置 , 消费者挂了再启动,则从上次消费进度继续执行 。
public class SimpleOrderApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer , 并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");/*** 设置Consumer第一次启动是从队列头部、队列尾部、还是指定时间戳节点开始消费* 非第一次启动接着上次消费的进度继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3.指定Topic和Tag 信息 。* 代表所有consumer.subscribe("testTopic", "*");// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo , 具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}2.3 过滤消息消费可以使用MessageSelector.byTag来进行标签筛?。换蛘呤褂肕essageSelector.bySql 来进行消息属性筛?。换蛘呋旌鲜褂?。参考下面代码,注释说明的比较清楚 。
public class FilterConsumerApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");// 3.指定Topic和Tag 信息 。只有订阅的消息有 sex 和 name 属性, 并且年龄为 18 岁以上的男性// consumer.subscribe("testTopic", MessageSelector.byTag("userTag1 || userTag2"));consumer.subscribe("testTopic", MessageSelector.bySql("sex = 'male' AND age > 18"));// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo , 具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}3 总结
  • 消费方式:Push(推) 或者 Pull(拉)
  • 消费模式:广播模式和集群模式
  • 消息消费反馈
  • 流量控制(包括消费并发线程数设置)
  • 消息的过滤(Tag, Key),过滤标签 TagA||TagB||TagC
【MQ系列6:消息的消费】

推荐阅读