Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

大家好,我是失业在家,正在找工作的博主Jerry,找工作之余,总结和整理以前的项目经验 , 动手写了个洋葱架构(整洁架构)示例解决方案 OnionArch 。其目的是为了更好的实现基于DDD(领域驱动分析)和命令查询职责分离(CQRS)的洋葱架构 。
OnionArch 是用来实现单个微服务的 。它提供了Grpc接口和Dapr Side Car进行交互,通过Dapr来实现微服务之间的接口调用、事件发布订阅等微服务特性 。但是,Dapr官方文档上只有Go语言的Grpc的微服务调用示例 , 没有事件发布和订阅示例,更没有基于Grpc通讯用.Net实现的事件订阅和发布示例 。
一、实现目标为了方便大家写代码 , 本文旨在介绍如何通过Dapr实现.Net Grpc服务之间的发布和订阅,并采用与WebApi类似的事件订阅方式 。
【Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式】如果是Dapr Side Car通过Web Api和微服务引用交互,在WebApi中实现事件订阅非常简单 , 只要在Action 上增加“[Topic("pubsub", "TestTopic")]” Attribute即可,可如果Dapr是通过Grpc和Grpc服务交互就不能这样写了 。
为了保持WebApi和Grpc事件订阅代码的一致性,本文就是要在Grpc通讯的情况下实现如下写法来订阅并处理事件 。
[Topic("pubsub", "TestTopic")]public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context){string message = "TestTopicEvent" + request.EventData.Name;Console.WriteLine(message);return Task.FromResult(new HelloReply{Message = message});}二、实现方案Dapr实现.Net Grpc服务之间的发布和订阅 , 根据官方文档,需要重写AppCallback.AppCallbackBase Grpc类的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是给Dapr调用获取该微服务已订阅的事件,OnTopicEvent给Dapr调用以触发事件到达处理逻辑 。但是这样就需要在AppCallback.AppCallbackBase实现类中硬编码已订阅的事件和事件处理逻辑 。显然不符合我们的实现目标 。
参考Dapr SDK中关于WebApi 订阅查询接口“http://localhost:<appPort>/dapr/subscribe”的实现代码 , 可以在AppCallback.AppCallbackBase实现类的ListTopicSubscriptions方法中,采用相同的方式,在Grpc方法中查询Topic Attribute的方式来搜索已订阅的事件 。这样就不用在ListTopicSubscriptions中硬编码已订阅的事件了 。
为了避免在OnTopicEvent方法中应编码事件处理逻辑,就需要在接收到事件触发后动态调用Grpc方法 。理论上 , 只要有proto文件就可以动态调用Grpc方法,而proto文件本来就在项目中 。但是,我没找到.Net动态调用Grpc方法的相关资料,不知道大家有没有?
我这里采用了另一种方式 , 根据我上一篇关于.Net 7.0 RC gRPC JSON 转码为 Swagger/OpenAPI文档 。Grpc方法可以增加一个转码为Json的WebApi调用 。这样就可以在OnTopicEvent方法中接收到事件触发后,通过HttpClient post到对应的WebApi地址,曲线实现动态调用Grpc方法 。是不是有点脱裤子放屁的感觉?
三、代码实现我的解决方案如下,GrpcServiceA发布事件,GrpcServiceB接收事件并处理 。

Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

文章插图
实现事件发布GrpcServiceA发布事件比较简单,和WebApi的方式是一样一样的 。
public async override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context){//await _daprClient.SaveStateAsync("statestore", "testKey", request.Name);EventData eventData = https://www.huyubaike.com/biancheng/new EventData() { Id = 6, Name = request.Name, Description ="Looking for a job" };await _daprClient.PublishEventAsync<EventData>("pubsub", "TestTopic", eventData);return new HelloReply{Message = "Hello" + request.Name};}_daprClient怎么来的?我参考Dapr .Net SDK的代码 , 给IGrpcServerBuilder 增加了扩展方法:
Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

文章插图
Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式

文章插图
public static IGrpcServerBuilder AddDapr(this IGrpcServerBuilder builder, Action<DaprClientBuilder> configureClient = null){if (builder is null){throw new ArgumentNullException(nameof(builder));}// This pattern prevents registering services multiple times in the case AddDapr is called// by non-user-code.if (builder.Services.Any(s => s.ImplementationType == typeof(DaprMvcMarkerService))){return builder;}builder.Services.AddDaprClient(configureClient);builder.Services.AddSingleton<DaprMvcMarkerService>();return builder;}private class DaprMvcMarkerService{}

推荐阅读