关于ASP.NET Core WebSocket实现集群的思考( 二 )

咱们通过一张图大致的展示一下它的工作方式

关于ASP.NET Core WebSocket实现集群的思考

文章插图
解释一下,每个客户端注册到WebSocket服务里的时候会在redis里订阅一个user:用户唯一标识的频道,这个频道用于接收和当前WebSocket连接不在一个服务端的其他WebSocket发送过来的消息 。每次发送消息的时候你会知道你要发送给谁,不在当前服务器的话则发送到redis的user:用户唯一标识频道,这样的话目标WebSocket就能收到消息了 。首先是注入相关的依赖项 , 这里我使用的redis客户端是freeredis,主要是因为操作起来简单,具体实现代码如下
var builder = WebApplication.CreateBuilder(args);//注册freeredisbuilder.Services.AddSingleton(provider => {var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();RedisClient cli = new RedisClient("127.0.0.1:6379");cli.Notice += (s, e) => logger?.LogInformation(e.Log);return cli;});//注册WebSocket具体操作的类builder.Services.AddSingleton<WebSocketHandler>();builder.Services.AddControllers();var app = builder.Build();var webSocketOptions = new WebSocketOptions{KeepAliveInterval = TimeSpan.FromMinutes(2)};//注册WebSocket中间件app.UseWebSockets(webSocketOptions);app.MapGet("/", () => "Hello World!");app.MapControllers();app.Run();接下来我们定义一个Controller用来处理WebSocket请求
public class WebSocketController : ControllerBase{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketHandler _socketHandler;public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_socketHandler = socketHandler;}//这里的id代表当前连接的客户端唯一标识比如用户唯一标识[HttpGet("/chat/user/{id}")]public async Task ChatUser(string id){//判断是否是WebSocket请求if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//处理请求相关await _socketHandler.Handle(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}}这里的WebSocketHandler是用来处理具体逻辑用的 , 咱们看一下相关代码
public class WebSocketHandler:IDisposable{//存储当前服务用户的集合private readonly UserConnection UserConnection = new();//redis频道前缀private readonly string userPrefix = "user:";//用户对应的redis频道private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly ILogger<WebSocketHandler> _logger;//redis客户端private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task Handle(string id, WebSocket webSocket){//把当前用户连接存储起来_ = UserConnection.GetOrAdd(id, webSocket);//订阅一个当前用户的频道await SubMsg($"{userPrefix}{id}");var buffer = new byte[1024 * 4];//接收发送过来的消息 , 这个方法是阻塞的,如果没收到消息则一直阻塞var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//循环接收消息while (webSocket.State == WebSocketState.Open){try{//因为缓冲区长度是固定的所以要获取实际长度string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//接收的到消息转换成实体MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);//发送到其他客户端的数据byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果要发送的目标端不在当前服务 , 则发送给目标redis端的频道ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };//目标的redis频道_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}//继续阻塞循环接收消息receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//循环结束意味着当前端已经退出//从当前用户的集合移除当前用户_ = UserConnection.TryRemove(id, out _);//关闭当前WebSocket连接await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在当前订阅集合移除当前用户_disposables.TryRemove($"{userPrefix}{id}", out var disposable);//关闭当前用户的通道disposable.Dispose();}private async Task SubMsg(string channel){//订阅当前用户频道var sub = _redisClient.Subscribe(channel,async (channel, data) => {//接收过来当前频道数据,说明发送端不在当前服务ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");//在当前服务找到目标的WebSocket连接并发送消息if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把redis订阅频道添加到集合中_disposables.TryAdd(channel, sub);}//程序退出的时候取消当前服务订阅的redis频道public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}}

推荐阅读