关于ASP.NET Core WebSocket实现集群的思考( 七 )

加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下
{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}Method为Group代表着用户加入群组的业务类型 , Group则代表你要发送到具体的群组的唯一标识,MsgBody则是发送到群组内的消息 。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑
private async Task SubGroupMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) =>{//接收群组订阅消息ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//获取当前服务器实例中当前群组的所有用户连接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){if (user == msgBody.FromId){continue;}//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{currentGroup.Remove(user);}}});_disposables.TryAdd(channel, sub);}全员消息处理全员消息处理相对来说思路比较简单,因为当服务启动的时候就会监听redis的全员消息频道,这样的话具体的实现也就只包含发送和接收全员消息了,首先看一下全员消息发送的逻辑
private async Task HandleAll(string id, object msgBody){_logger.LogInformation($"user {id} send:{msgBody}");//直接给redis的全员频道发送消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));}全员消息的发送数据格式如下所示
{"Method":"All", "MsgBody":"Hello All"}Method为All代表着全员消息类型,MsgBody则代表着具体消息 。接收消息出里同样很简单,订阅redis全员消息频道,然后遍历当前WebSocket服务器实例内的所有用户获取连接发送消息,具体逻辑如下
private async Task SubAllMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//获取当前服务器实例内所有用户的连接foreach (var user in UserConnection){//不给自己发送消息,因为发送的时候可以通过具体的业务代码处理if (user.Key == msgBody.FromId){continue;}//给每个用户发送消息if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(user.Key, out _);}}});_disposables.TryAdd(channel, sub);}示例源码由于篇幅有限,没办法设计到全部的相关源码 , 因此在这里贴出来github相关的地址,方便大家查看和运行源码 。相关的源码我这里实现了两个版本,一个是基于asp.net core的版本,一个是基于golang的版本 。两份源码的实现思路是一致的,所以这两份代码可以运行在一套集群示例里 , 配置在一套nginx里,并且连接到同一个redis实例里即可

  • asp.net core源码示例 https://github.com/softlgl/WebsocketCluster
  • golang源码示例 https://github.com/softlgl/websocket-cluster
仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下 。
总结本文基于ASP.NET Core框架提供了一个基于WebSocket做集群的示例,由于思想是通用的,所以基于这个思路楼主也实现了golang版本 。其实在之前就想自己动手搞一搞关于WebSocket集群方面的设计 , 本篇文章算是对之前想法的一个落地操作 。其核心思路文章已经做了相关介绍,由于这些只是博主关于构思的实现,可能有很多细节尚未体现到,还希望大家多多理解 。其核心思路总结一下