这里涉及到了GroupUser
类 , 是来存储群组和群组用户的对应关系的 , 定义如下
public class GroupUser{//key为群组的唯一标识public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();}
演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送
文章插图
用户u2接收
文章插图
发送所有人发送给所有用户的逻辑比较简单 , 不用考虑到用户限制 , 只要用户连接到了WebSocket集群则都可以接收到这个消息,大致工作方式如下图所示
文章插图
这个比较简单,咱们直接看实现代码,首先是定义一个地址,用于发布消息
//把用户注册进去[HttpGet("/chat/all/{id}")]public async Task ChatAll(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
具体的实现逻辑还是在HandleGroup类里 , 是HandleAll方法 , 看一下具体实现public class WebSocketHandler:IDisposable{private readonly UserConnection AllConnection = new();private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleAll(string id, WebSocket webSocket){await _lock.WaitAsync();//把用户加入用户集合_ = AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每个服务只定义一次if (AllConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收信息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"user {id} send:{msg}");//获取接收信息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };//把消息通过redis发布到集群中的其他服务_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//用户退出则删除集合中的当前用户信息_ = AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubAllMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//接收到消息后遍历用户集合把消息发送给所有用户foreach (var user in AllConnection){//如果包含当前用户跳过if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});_disposables.TryAdd(channel, sub);}}
效果在这里就不展示了,和群组的效果是类似的,只是一个是部分用户 , 一个是全部的用户 。整合到一起上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接 。所以我们需要把上面的演示整合一下,一个用户只需要连接到WebSocket集群一次即可,至于发送给谁 , 加入什么群组,接收全部消息等都是连接后通过一些标识区分的 , 而不必每个类型的操作都注册一次,就和微信和QQ一样我只要登录了即可,至于其他操作都是靠数据标识区分的 。接下来咱们就整合一下代码达到这个效果,大致的思路是
- 用户连接到WebSocket集群,把用户和连接保存到当前WebSocket服务器的用户集合中去 。
推荐阅读
- 关于入门深度学习mnist数据集前向计算的记录
- 《ASP.NET Core技术内幕与项目实战》精简集-目录
- 华为开发者大会HDC2022:HMS Core 持续创新,与开发者共创美好数智生活
- 上 学习ASP.NET Core Blazor编程系列十——路由
- 手记系列之二 ----- 关于IDEA的一些使用方法经验
- 重新整理 .net core 实践篇 ———— linux上性能排查 [外篇]
- 重新整理 .net core 实践篇 ———— linux上排查问题实用工具 [外篇]
- 华为开发者大会2022:HMS Core 3D建模服务再升级,万物皆可驱动
- 两种 .Net Core 3.0 对 MongoDB 的多条件查询操作
- 【JVM】关于JVM,你需要掌握这些 | 一文彻底吃透JVM系列