关于ASP.NET Core WebSocket实现集群的思考( 六 )


关于ASP.NET Core WebSocket实现集群的思考

文章插图
由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式 。
一对一处理首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息
private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult) {MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}}接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息
private async Task SubMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);}如果给某个用户发送消息则可以使用如下的消息格式
{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}Method为One代表着是私聊一对一的情况 , 消息体内Id为要发送给的具体用户标识和消息体 。
群组处理接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接 , 首先看用户加入群组的逻辑
private async Task AddUserGroup(string user, string group, WebSocket webSocket){//获取群组信息var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());lock (currentGroup){//把用户标识加入当前组_ = currentGroup.Add(user);}//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count == 1){//订阅当前组消息await SubGroupMsg($"{groupPrefix}{group}");}string addMsg = $"user 【{user}】 addto group 【{group}】";byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组,则通知其他群成员ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));}用户想要在群组内发消息,则必须先加入到一个具体的群组内 , 具体的加入群组的格式如下
{"Method":"UserGroup", "Group":"g1"}Method为UserGroup代表着用户加入群组的业务类型 , Group代表着你要加入的群组唯一标识 。接下来就看下,用户发送群组消息的逻辑了
private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody){//判断群组是否存在var hasValue = https://www.huyubaike.com/biancheng/GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组,才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");//发送群组消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));}

推荐阅读