Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式( 二 )

然后就可以这样把DaprClient依赖注入到服务中 。
builder.Services.AddGrpc().AddJsonTranscoding().AddDapr();实现事件订阅根据上述实现方案,GrpcServiceB接收事件并处理有点复杂,参考我Grpc接口转码Json的的内容,在要接收事件的Grpc方法上增加转码Json WebApi 配置 。
rpc TestTopicEvent (TestTopicEventRequest) returns (HelloReply){option (google.api.http) = {post: "/v1/greeter/testtopicevent",body: "eventData"};}增加google.api.http选项, , 可以通过post eventData 数据到地址“/v1/greeter/testtopicevent”调用该Grpc方法 。然后实现该Grpc接口 。
[Topic("pubsub", "TestTopic")]public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context){string message = "TestTopicEvent" + request.EventData.Name;Console.WriteLine(message);return Task.FromResult(new HelloReply{Message = message});}我重用了Dapr .Net SDK 的Topic Attribute来标记该Grpc的实现接口 , 这样就可以搜索所有带Topic Attribute的Grpc方法来获取已经订阅的事件 。
接下来才是重头戏,重写AppCallback.AppCallbackBase Grpc接口类的ListTopicSubscriptions方法和OnTopicEvent方法
public async override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context){var result = new ListTopicSubscriptionsResponse();var subcriptions = _endpointDataSource.GetDaprSubscriptions(_loggerFactory);foreach (var subscription in subcriptions){TopicSubscription subscr = new TopicSubscription(){PubsubName = subscription.PubsubName,Topic = subscription.Topic,Routes = new TopicRoutes()};subscr.Routes.Default = subscription.Route;result.Subscriptions.Add(subscr);}return result;}该方法返回所有已订阅的事件和对应的WebApi Url,将事件对应的WebApi地址放入subscr.Routes.Default中 。
其中_endpointDataSource.GetDaprSubscriptions 方法参考了Dapr .Net SDK的实现 。
public static List<Subscription> GetDaprSubscriptions(this EndpointDataSource dataSource, ILoggerFactory loggerFactory, SubscribeOptions options = null){var logger = loggerFactory.CreateLogger("DaprTopicSubscription");var subscriptions = dataSource.Endpoints.OfType<RouteEndpoint>().Where(e => e.Metadata.GetOrderedMetadata<ITopicMetadata>().Any(t => t.Name != null)) // only endpoints which have TopicAttribute with not null Name..SelectMany(e =>{var topicMetadata = https://www.huyubaike.com/biancheng/e.Metadata.GetOrderedMetadata();var originalTopicMetadata = e.Metadata.GetOrderedMetadata();var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();for (int i = 0; i < topicMetadata.Count(); i++){subs.Add((topicMetadata[i].PubsubName,topicMetadata[i].Name,(topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,(topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,topicMetadata[i].Match,topicMetadata[i].Priority,originalTopicMetadata.Where(m => (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.OwnedMetadatas?.Any(o => o.Equals(m.Id)) == true || string.IsNullOrEmpty(m.Id)).GroupBy(c => c.Name).ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),(topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,e.RoutePattern));}return subs;}).Distinct().GroupBy(e => new { e.PubsubName, e.Name }).Select(e => e.OrderBy(e => e.Priority)).Select(e =>{var first = e.First();var rawPayload = e.Any(e => e.EnableRawPayload.GetValueOrDefault());var metadataSeparator = e.FirstOrDefault(e => !string.IsNullOrEmpty(e.MetadataSeparator)).MetadataSeparator ??",";var rules = e.Where(e => !string.IsNullOrEmpty(e.Match)).ToList();var defaultRoutes = e.Where(e => string.IsNullOrEmpty(e.Match)).Select(e => RoutePatternToString(e.RoutePattern)).ToList();//var defaultRoute = defaultRoutes.FirstOrDefault();var defaultRoute = defaultRoutes.LastOrDefault();//multiple identical names. use comma separation.var metadata = https://www.huyubaike.com/biancheng/new Metadata(e.SelectMany(c => c.OriginalTopicMetadata).GroupBy(c => c.Key).ToDictionary(c => c.Key, c => string.Join(metadataSeparator, c.SelectMany(c => c.Value).Distinct())));if (rawPayload || options?.EnableRawPayload is true){metadata.Add(Metadata.RawPayload,"true");}if (logger != null){if (defaultRoutes.Count > 1){logger.LogError("A default subscription to topic {name} on pubsub {pubsub} already exists.", first.Name, first.PubsubName);}var duplicatePriorities = rules.GroupBy(e => e.Priority).Where(g => g.Count() > 1).ToDictionary(x => x.Key, y => y.Count());foreach (var entry in duplicatePriorities){logger.LogError("A subscription to topic {name} on pubsub {pubsub} has duplicate priorities for {priority}: found {count} occurrences.", first.Name, first.PubsubName, entry.Key, entry.Value);}}var subscription = new Subscription(){Topic = first.Name,PubsubName = first.PubsubName,Metadata = https://www.huyubaike.com/biancheng/metadata.Count > 0 ? metadata : null,};if (first.DeadLetterTopic != null){subscription.DeadLetterTopic = first.DeadLetterTopic;}// Use the V2 routing rules structureif (rules.Count > 0){subscription.Routes = new Routes{Rules = rules.Select(e => new Rule{Match = e.Match,Path = RoutePatternToString(e.RoutePattern),}).ToList(),Default = defaultRoute,};}// Use the V1 structure for backward compatibility.else{subscription.Route = defaultRoute;}return subscription;}).OrderBy(e => (e.PubsubName, e.Topic));return subscriptions.ToList();}private static string RoutePatternToString(RoutePattern routePattern){return string.Join("/", routePattern.PathSegments.Select(segment => string.Concat(segment.Parts.Cast<RoutePatternLiteralPart>().Select(part => part.Content))));}

推荐阅读