关于ASP.NET Core WebSocket实现集群的思考( 五 )

  • 一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息
  • 群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息
  • 全员消息的时候 , 直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息
  • 这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示
    public class WebSocketChannelController : ControllerBase{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketChannelHandler _webSocketChannelHandler;public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_webSocketChannelHandler = webSocketChannelHandler;}//只需要把当前用户连接到服务即可[HttpGet("/chat/channel/{id}")]public async Task Channel(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _webSocketChannelHandler.HandleChannel(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}}接下来看一下WebSocketChannelHandler类的HandleChannel方法实现 , 用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息
    public class WebSocketChannelHandler : IDisposable{//用于存储当前WebSocket服务器链接上来的所有用户对应关系private readonly UserConnection UserConnection = new();//用于存储群组和用户关系,用户集合采用HashSet保证每个用户只加入一个群组一次private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();private readonly SemaphoreSlim _lock = new(1, 1);//存放redis订阅实例private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();//一对一redis频道前缀private readonly string userPrefix = "user:";//群组redis频道前缀private readonly string groupPrefix = "group:";//全员redis频道private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleChannel(string id, WebSocket webSocket){await _lock.WaitAsync();//每次连接进来就添加到用户集合_ = UserConnection.GetOrAdd(id, webSocket);//每个WebSocket服务实例只需要订阅一次全员消息频道await SubMsg($"{userPrefix}{id}");if (UserConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//接收客户端消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//读取客户端消息ChannelData channelData = https://www.huyubaike.com/biancheng/JsonConvert.DeserializeObject(msg);//判断消息类型switch (channelData.Method){//一对一case"One":await HandleOne(id, channelData.MsgBody, receiveResult);break;//把用户加入群组case "UserGroup":await AddUserGroup(id, channelData.Group, webSocket);break;//处理群组消息case "Group":await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);break;//处理全员消息default:await HandleAll(id, channelData.MsgBody);break;}receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在群组中移除当前用户foreach (var users in GroupUser.Values){lock (users){users.Remove(id);}}//当前客户端用户退出则移除连接_ = UserConnection.TryRemove(id, out _);//取消用户频道订阅_disposables.Remove($"{userPrefix}{id}", out var sub);sub?.Dispose();}public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}}这里涉及到了ChannelData类是用于接收客户端消息的类模板 , 具体定义如下
    public class ChannelData{//消息类型 比如一对一 群组 全员public string Method { get; set; }//群组标识public string Group { get; set; }//消息体public object MsgBody { get; set; }}类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识 。结合上面的处理代码我们可以看出,客户端用户连接到WebSocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示

    推荐阅读