这里涉及到几个辅助相关的类 , 其中UserConnection
类是存储注册到当前服务的连接,MsgBody
类用来接受客户端发送过来的消息,ChannelMsgBody
是用来发送redis频道的相关消息,因为要把相关消息通过redis发布出去 , 咱们列一下这几个类的相关代码
//注册到当前服务的连接public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>{//存储用户唯一标识和WebSocket的对应关系private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();//当前服务的用户数量public int Count => _users.Count;public WebSocket GetOrAdd(string userId, WebSocket webSocket){return _users.GetOrAdd(userId, webSocket);}public bool TryGetValue(string userId, out WebSocket webSocket){return _users.TryGetValue(userId, out webSocket);}public bool TryRemove(string userId, out WebSocket webSocket){return _users.TryRemove(userId, out webSocket);}public void Clear(){_users.Clear();}public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator(){return _users.GetEnumerator();}IEnumerator IEnumerable.GetEnumerator(){return this.GetEnumerator();}}//客户端消息public class MsgBody{//目标用户标识public string Id { get; set; }//要发送的消息public string Msg { get; set; }}//频道订阅消息public class ChannelMsgBody{//用户标识public string FromId { get; set; }//目标用户标识,也就是要发送给谁public string ToId { get; set; }//要发送的消息public string Msg { get; set; }}
这样的话关于一对一发送消息的相关逻辑就实现完成了,启动两个Server端,由于nginx默认的负载均衡策略是轮训,所以注册两个用户的话会被分发到不同的服务里去
文章插图
文章插图
用
Postman
连接三个连接唯一标识分别是1、2、3
, 模拟一下消息发送,效果如下,发送效果文章插图
接收效果
文章插图
群组发送上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况 。群组发送的话就是只要大家都加入一个群组 , 只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息 。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组,则当前当前WebSocket服务端则可以订阅一个
group:群组唯一标识
的redis频道,集群中的其他WebSocket服务器通过这个redis频道接收群组消息,通过一张图描述一下文章插图
群组的实现方式相对于一对一要简单一点
- 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去
- 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息
//包含两个标识一个是组别标识一个是注册到组别的用户[HttpGet("/chat/group/{groupId}/{userId}")]public async Task ChatGroup(string groupId, string userId){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//调用HandleGroup处理群组相关的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中 , 看一下代码实现public class WebSocketHandler:IDisposable{private readonly UserConnection UserConnection = new();private readonly GroupUser GroupUser = new();private readonly SemaphoreSlim _lock = new(1, 1);private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string groupPrefix = "group:";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleGroup(string groupId, string userId, WebSocket webSocket){//因为群组的集合可能会存在很多用户一起访问所以限制访问数量await _lock.WaitAsync();//初始化群组容器 群唯一标识为key 群员容器为valuevar currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });//当前用户加入当前群组_ = currentGroup.GetOrAdd(userId, webSocket);//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息if (currentGroup.Count == 1){//订阅redis频道await SubGroupMsg($"{groupPrefix}{groupId}");}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收WebSocket消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//服务不退出的话则一直等待接收while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");//组装redis频道发布的消息,目标为群组标识ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };//通过redis发布消息_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//如果客户端退出则在当前群组集合删除当前用户_ = currentGroup.TryRemove(userId, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubGroupMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//在当前WebSocket服务器找到当前群组里的用户GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);//循环当前WebSocket服务器里的用户发送消息foreach (var user in currentGroup){//不用给自己发送了if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把当前频道加入订阅集合_disposables.TryAdd(channel, sub);}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 关于入门深度学习mnist数据集前向计算的记录
- 《ASP.NET Core技术内幕与项目实战》精简集-目录
- 华为开发者大会HDC2022:HMS Core 持续创新,与开发者共创美好数智生活
- 上 学习ASP.NET Core Blazor编程系列十——路由
- 手记系列之二 ----- 关于IDEA的一些使用方法经验
- 重新整理 .net core 实践篇 ———— linux上性能排查 [外篇]
- 重新整理 .net core 实践篇 ———— linux上排查问题实用工具 [外篇]
- 华为开发者大会2022:HMS Core 3D建模服务再升级,万物皆可驱动
- 两种 .Net Core 3.0 对 MongoDB 的多条件查询操作
- 【JVM】关于JVM,你需要掌握这些 | 一文彻底吃透JVM系列