基于Spring的发布订阅模式 EventListener( 二 )


public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {Executor executor = getTaskExecutor();if (executor != null) {executor.execute(new Runnable() {@Overridepublic void run() {invokeListener(listener, event);}});}else {invokeListener(listener, event);}}}大家可以看到如果给它一个executor(java.util.concurrent.Executor),它就可以异步支持发布事件了 。
所以我们发送事件只需要通过ApplicationContext.publishEvent即可
监听器具体代表者是:ApplicationListener
1、其继承自JDK的EventListener
package org.springframework.context;import java.util.EventListener;/** * Interface to be implemented by application event listeners. * Based on the standard {@code java.util.EventListener} interface * for the Observer design pattern. * * <p>As of Spring 3.0, an ApplicationListener can generically declare the event type * that it is interested in. When registered with a Spring ApplicationContext, events * will be filtered accordingly, with the listener getting invoked for matching event * objects only. * * @author Rod Johnson * @author Juergen Hoeller * @param <E> the specific ApplicationEvent subclass to listen to * @see org.springframework.context.event.ApplicationEventMulticaster */public interface ApplicationListener<E extends ApplicationEvent> extends EventListener { /*** Handle an application event.* @param event the event to respond to*/ void onApplicationEvent(E event);}
我们在spring中自己实现的demo:1.定义事件@Datapublic class OrderCreateEvent extends ApplicationEvent {private String tradeOrderNo;private Long userId;public OrderCreateEvent(Object source, String tradeOrderNo, Long userId) {super(source);this.tradeOrderNo = tradeOrderNo;this.userId = userId;}}
2.实现目标(发布者)public interface OrderPublisher {default void publish(ApplicationEvent event) {ApplicationContext context = SpringBeanUtil.getContext();context.publishEvent(event);}}
3.实现监听器方式一@Component@Slf4jpublic class OrderEventHandler {private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();@Async@EventListenerpublic void createHandle(OrderCreateEvent event) {RedisKey redisKey = new RedisKey("order", "create", event.getTradeOrderNo());if (redisTemplate.hasKey(redisKey.toString())) {return;}Order order = (Order) event.getSource();//调用 HSF 接口创建订单落 EStry {Boolean syncResult = orderSyncEsServiceProxy.createOrder(order);if (!syncResult) {log.error("同步订单 tradeOrderNo:[{}],创建订单失败", order.getTradeOrderNo());}} catch (Throwable e) {log.error("同步订单 tradeOrderNo:[{}],创建订单异常:", order.getTradeOrderNo(), e);}String messageId = mqProductConfig.sendMessage(mqPropertiesConfig.getGroupNameMap().get(MqGroupConstant.GroupType.GID_CREATE_ORDER),MqGroupConstant.Tag.CREATE_ORDER_TAG,event.getTradeOrderNo(), 60 * 30L);redisTemplate.opsForValue().set(redisKey.toString(), gson.toJson(event), 60 * 60 * 24, TimeUnit.SECONDS);// 锁定用户权益benefitServiceProxy.lockCouponBenefit(order);// 用户信息UserInfoDTO userInfoDTO =userServiceProxy.queryUserInfo(order.getUserId(), order.getSubOrders().get(0).getAddressId(),order.getTradeChannel(), null);// 创建订单埋点youShuTrackService.track(TrackFactory.createOrderFactory(order, userInfoDTO), YouShuApiEnum.ORDER_ADD);log.error("[{}] OrderEventHandler.createHandle.event , messageId [{}] ", event.getTradeOrderNo(), messageId);}@Async@EventListenerpublic void paidHandle(OrderPaidEvent event) {RedisKey redisKey = new RedisKey("order", "paid", event.getTradeOrderNo());if (redisTemplate.hasKey(redisKey.toString())) {return;}redisTemplate.opsForValue().set(redisKey.toString(), gson.toJson(event), 60 * 60 * 24, TimeUnit.SECONDS);// 拼团付款成功后 开团或者加团接口Order order = (Order) event.getSource();if (BusinessTypeEnum.AID_GROUP_ORDER.getCode().equals(order.getBizType()) ||BusinessTypeEnum.COMMUNITY_GROUP_ORDER.getCode().equals(order.getBizType())) {aidGroupServiceProxy.createAndJoinGroup(order);}// 使用用户权益benefitServiceProxy.useCouponBenefit(order);}@Async@EventListenerpublic void confirmHandle(OrderConfirmEvent event) {RedisKey redisKey = new RedisKey("order", "confirm", event.getTradeOrderNo());if (redisTemplate.hasKey(redisKey.toString())) {return;}String messageId = mqProductConfig.sendMessage(mqPropertiesConfig.getGroupNameMap().get(MqGroupConstant.GroupType.GID_CONFIRM_ORDER),MqGroupConstant.Tag.CONFIRM_ORDER_TAG + evn,gson.toJson(event.getSource()), 0L);Map<String, String> eventMap = Maps.newHashMap();eventMap.put("event", gson.toJson(event));eventMap.put("source", gson.toJson(event.getSource()));eventMap.put("messageId", messageId);redisTemplate.opsForHash().putAll(redisKey.toString(), eventMap);redisTemplate.expire(redisKey.toString(), 60 * 60 * 24, TimeUnit.SECONDS);Order source = (Order) event.getSource();source.getSubOrders().forEach(subOrder -> {itemLimitHelper.addItemLimitNum(subOrder.getUserId(), subOrder.getBizItemId(),subOrder.getBuyAmount().intValue(), subOrder.getTradeChannel(), source.getBizType());});// 订单 - 确单,埋点youShuTrackService.track(TrackFactory.updateOrderFactory(event.getTradeOrderNo(), TrackOrderStatusEnum.UN_TRANSPORT), YouShuApiEnum.ORDER_UPDATE);log.error("[{}]OrderEventHandler.confirmHandle success , messageId=[{}]", event.getTradeOrderNo(), messageId);}@Async@EventListenerpublic void refundHandle(OrderRefundEvent event) {RefundOrderDTO refundOrderDTO = event.getRefundOrderDTO();log.info("[{}] OrderEventHandler.refundHandle", event.getTradeOrderNo());log.info("refundOrderDTO is{}", JSON.toJSONString(refundOrderDTO));RedisKey redisKey = new RedisKey("order", "refund", event.getTradeOrderNo());if (refundOrderDTO != null) {redisKey = new RedisKey("order", "refund", event.getTradeOrderNo(),CollectionUtils.isEmpty(refundOrderDTO.getTradeSubOrderIdList()) ? "" :Joiner.on(",").join(refundOrderDTO.getTradeSubOrderIdList()));}if (redisTemplate.hasKey(redisKey.toString())) {return;}RefundMqDTO refundMqDTO = new RefundMqDTO();refundMqDTO.setTradeOrderNo(event.getTradeOrderNo());if (refundOrderDTO != null) {refundMqDTO.setRefundFee(refundOrderDTO.getRefundFee());refundMqDTO.setSubOrderId(refundOrderDTO.getTradeSubOrderId());refundMqDTO.setSubOrderIdList(refundOrderDTO.getTradeSubOrderIdList());}String messageId = mqProductConfig.sendMessage(mqPropertiesConfig.getGroupNameMap().get(MqGroupConstant.GroupType.GID_REFUND_PAYMENT),MqGroupConstant.Tag.REFUND_PAYMENT,GsonUtils.gson().toJson(refundMqDTO), 0L);redisTemplate.opsForValue().set(redisKey.toString(), messageId, 3, TimeUnit.DAYS);Order order = (Order) event.getSource();//退款根据benefitServiceProxy.returnCouponBenefit(order);if (Objects.nonNull(refundOrderDTO) && Objects.nonNull(refundOrderDTO.getBenefitRefundFee())&& refundOrderDTO.getBenefitRefundFee() > 0) {benefitServiceProxy.returnBenefitDiscountFee(order.getUserId(), order.getTradeOrderNo(),refundOrderDTO.getTradeSubOrderIdList(), refundOrderDTO.getBenefitRefundFee());}//库存回退if (TradeChannelEnum.WECHAT_SUBSCRIPTION.getCode().equals(order.getTradeChannel())) {Date startDeliveryDate = order.getSubOrders().get(0).getSubOrderLine().getStartDeliveryDate();saleStockOptProxy.returnStock(SaleStockOptProxy.groupBizItemByOrder(order), startDeliveryDate);}//全部退款进行调用es更新兼容拼团退款if (Objects.isNull(refundOrderDTO) || CollectionUtils.isEmpty(refundOrderDTO.getTradeSubOrderIdList())) {orderSyncEsServiceProxy.changeOrderStatus(order.getTradeOrderNo(), OrderPayStatusEnum.CLOSED);}// 退款埋点youShuTrackService.track(TrackFactory.returnOrderFactory(order), YouShuApiEnum.ORDER_RETURN);log.error("[{}]OrderEventHandler.refundHandle success , messageId=[{}]", event.getTradeOrderNo(), messageId);}@Async@EventListenerpublic void cancelHandle(OrderCancelEvent event) {log.info("[{}] OrderEventHandler.cancelHandle", event.getTradeOrderNo());RedisKey redisKey = new RedisKey("order", "cancel", event.getTradeOrderNo());if (redisTemplate.hasKey(redisKey.toString())) {return;}redisTemplate.opsForValue().set(redisKey.toString(), System.currentTimeMillis() + "", 3, TimeUnit.DAYS);Order order = (Order) event.getSource();//权益解锁benefitServiceProxy.unLockCouponBenefit(order);//库存回退if (TradeChannelEnum.WECHAT_SUBSCRIPTION.getCode().equals(order.getTradeChannel())) {log.info("startReturnStock : {}", JSON.toJSONString(SaleStockOptProxy.groupBizItemByOrder(order)));Date startDeliveryDate = order.getSubOrders().get(0).getSubOrderLine().getStartDeliveryDate();saleStockOptProxy.returnStock(SaleStockOptProxy.groupBizItemByOrder(order), startDeliveryDate);}String freezerNum = AttributesUtils.getAttrByKey(order.getAttributes(), "appoint_freezer");String freezerBoxNum = AttributesUtils.getAttrByKey(order.getAttributes(), "appoint_freezer_box");freezerOperationProxy.unlockBox(freezerNum, freezerBoxNum);// 接龙活动套餐库存if (BusinessTypeEnum.JIE_LONG_ORDER.getCode().equals(order.getBizType())) {Long storeId = order.getSubOrders().get(0).getStoreId();String comboId = AttributesUtils.getAttrByKey(order.getAttributes(), "combo_id");if (comboId == null) {log.error("Order content error {}", JSON.toJSONString(order));throw new BusinessException(OrderErrorCode.ITEM_NOT_EXISTS);}comboStoreServiceProxy.returnStock(Long.parseLong(comboId), storeId);}orderSyncEsServiceProxy.changeOrderStatus(order.getTradeOrderNo(), OrderPayStatusEnum.PAID_OUTTIME);// 订单 - 取消 , 埋点youShuTrackService.track(TrackFactory.updateOrderFactory(event.getTradeOrderNo(), TrackOrderStatusEnum.CANCEL_UN_PAY), YouShuApiEnum.ORDER_UPDATE);log.info("[{}]OrderEventHandler.cancelHandle success", event.getTradeOrderNo());}}

推荐阅读