文章插图
由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式 。
一对一处理首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息
private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult) {MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}}
接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息private async Task SubMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);}
如果给某个用户发送消息则可以使用如下的消息格式{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}
Method为One代表着是私聊一对一的情况 , 消息体内Id为要发送给的具体用户标识和消息体 。群组处理接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接 , 首先看用户加入群组的逻辑
private async Task AddUserGroup(string user, string group, WebSocket webSocket){//获取群组信息var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());lock (currentGroup){//把用户标识加入当前组_ = currentGroup.Add(user);}//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count == 1){//订阅当前组消息await SubGroupMsg($"{groupPrefix}{group}");}string addMsg = $"user 【{user}】 addto group 【{group}】";byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组,则通知其他群成员ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));}
用户想要在群组内发消息,则必须先加入到一个具体的群组内 , 具体的加入群组的格式如下{"Method":"UserGroup", "Group":"g1"}
Method为UserGroup代表着用户加入群组的业务类型 , Group代表着你要加入的群组唯一标识 。接下来就看下,用户发送群组消息的逻辑了private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody){//判断群组是否存在var hasValue = https://www.huyubaike.com/biancheng/GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组,才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");//发送群组消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 关于入门深度学习mnist数据集前向计算的记录
- 《ASP.NET Core技术内幕与项目实战》精简集-目录
- 华为开发者大会HDC2022:HMS Core 持续创新,与开发者共创美好数智生活
- 上 学习ASP.NET Core Blazor编程系列十——路由
- 手记系列之二 ----- 关于IDEA的一些使用方法经验
- 重新整理 .net core 实践篇 ———— linux上性能排查 [外篇]
- 重新整理 .net core 实践篇 ———— linux上排查问题实用工具 [外篇]
- 华为开发者大会2022:HMS Core 3D建模服务再升级,万物皆可驱动
- 两种 .Net Core 3.0 对 MongoDB 的多条件查询操作
- 【JVM】关于JVM,你需要掌握这些 | 一文彻底吃透JVM系列