Websocket集群解决方案( 二 )

websocket实现消息的推送 。
1. 添加依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2. 创建 ServerEndpointExporter 的 bean 实例ServerEndpointExporter 的 bean 实例自动注册 @ServerEndpoint 注解声明的 websocket endpoint,使用springboot自带tomcat启动需要该配置,使用独立 tomcat 则不需要该配置 。
@Configurationpublic class WebSocketConfig {//tomcat启动无需该配置@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}3. 创建服务端点 ServerEndpoint 和 客户端端

  • 服务端点
@Component@ServerEndpoint(value = "https://www.huyubaike.com/message")@Slf4jpublic class WebSocket { private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>(); private Session session; @OnOpen public void onOpen(Session session) throws SocketException {this.session = session;webSocketSet.put(this.session.getId(),this);log.info("【websocket】有新的连接,总数:{}",webSocketSet.size()); } @OnClose public void onClose(){String id = this.session.getId();if (id != null){webSocketSet.remove(id);log.info("【websocket】连接断开:总数:{}",webSocketSet.size());} } @OnMessage public void onMessage(String message){if (!message.equals("ping")){log.info("【wesocket】收到客户端发送的消息,message={}",message);sendMessage(message);} } /*** 发送消息* @param message* @return*/ public void sendMessage(String message){for (WebSocket webSocket : webSocketSet.values()) {webSocket.session.getAsyncRemote().sendText(message);}log.info("【wesocket】发送消息,message={}", message); }}
  • 客户端点
【Websocket集群解决方案】<div><input type="text" name="message" id="message"><button id="sendBtn">发送</button></div><div style="width:100px;height: 500px;" id="content"></div><script src="http://img.zhejianglong.com/231019/19122U1Y-6.jpg"></script><script type="text/javascript">var ws = new WebSocket("ws://127.0.0.1:8080/message");ws.onopen = function(evt) {console.log("Connection open ...");};ws.onmessage = function(evt) {console.log( "Received Message: " + evt.data);var p = $("<p>"+evt.data+"</p>")$("#content").prepend(p);$("#message").val("");};ws.onclose = function(evt) {console.log("Connection closed.");};$("#sendBtn").click(function(){var aa = $("#message").val();ws.send(aa);})</script>服务端和客户端中的OnOpenoncloseonmessage都是一一对应的 。
  • 服务启动后 , 客户端ws.onopen调用服务端的@OnOpen注解的方法,储存客户端的session信息 , 握手建立连接 。
  • 客户端调用ws.send发送消息,对应服务端的@OnMessage注解下面的方法接收消息 。
  • 服务端调用session.getAsyncRemote().sendText发送消息 , 对应的客户端ws.onmessage接收消息 。
添加 controller@GetMapping({"","index.html"})public ModelAndView index() { ModelAndView view = new ModelAndView("index"); return view;}效果展示打开两个客户端,其中的一个客户端发送消息,另一个客户端也能接收到消息 。
Websocket集群解决方案

文章插图
添加 RabbitMQ 中间件这里使用比较常用的RabbitMQ作为消息中间件,而RabbitMQ支持发布订阅模式:
Websocket集群解决方案

文章插图
添加消息订阅交换机使用扇形交换机,消息分发给每一条绑定该交换机的队列 。以服务器所在的IP + 端口作为唯一标识作为队列的命名 , 启动一个服务,使用队列绑定交换机,实现消息的订阅:
@Configurationpublic class RabbitConfig {@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");}@Beanpublic Queue psQueue() throws SocketException {// ip + 端口 为队列名String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();return new Queue("ps_" + ip);}@Beanpublic Binding routingFirstBinding() throws SocketException {return BindingBuilder.bind(psQueue()).to(fanoutExchange());}}

推荐阅读