引言
文章插图
A free, open-source distributed application framework for .NET. 一个免费、开源的.NET 分布式应用框架 。-- MassTransit 官网MassTransit,直译公共交通,是由
Chris Patterson
开发的基于消息驱动的.NET 分布式应用框架,其核心思想是借助消息来实现服务之间的松耦合异步通信,进而确保应用更高的可用性、可靠性和可扩展性 。通过对消息模型的高度抽象 , 以及对主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大简化了基于消息驱动的开发门槛,同时内置了连接管理、消息序列化和消费者生命周期管理 , 以及诸如重试、限流、断路器等异常处理机制,让开发者更好的专注于业务实现 。简而言之 , MassTransit实现了消息代理透明化 。无需面向消息代理编程进行诸如连接管理、队列的申明和绑定等操作 , 即可轻松实现应用间消息的传递和消费 。快速体验空口无凭,创建一个项目快速体验一下 。
- 基于
worker
模板创建一个基础项目:dotnet new worker -n MassTransit.Demo
- 打开项目,添加NuGet包:
MassTransit
- 定义订单创建事件消息契约:
using System;namespace MassTransit.Demo{public record OrderCreatedEvent{public Guid OrderId { get; set; }}}
- 修改
Worker
类,发送订单创建事件:
namespace MassTransit.Demo;public class Worker : BackgroundService{readonly IBus _bus;//注册总线public Worker(IBus bus){_bus = bus;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){//模拟并发送订单创建事件await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);await Task.Delay(1000, stoppingToken);}}}
- 仅需实现
IConsumer<OrderCreatedEvent>
泛型接口 , 即可实现消息的订阅:
public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>{private readonly ILogger<OrderCreatedEventConsumer> _logger;public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger){_logger = logger;}public Task Consume(ConsumeContext<OrderCreatedEvent> context){_logger.LogInformation($"Received Order:{context.Message.OrderId}");return Task.CompletedTask;}}
- 注册服务:
using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args).ConfigureServices(services =>{services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{//注册消费者configurator.AddConsumer<OrderCreatedEventConsumer>();//使用基于内存的消息路由传输configurator.UsingInMemory((context, cfg) =>{cfg.ConfigureEndpoints(context);});});}).Build();await host.RunAsync();
- 运行项目,一个简单的进程内事件发布订阅的应用就完成了 。
MassTransit.RabbitMQ
NuGet包 , 然后指定使用RabbitMQ 传输消息即可 。using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args).ConfigureServices(services =>{services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{configurator.AddConsumer<OrderCreatedEventConsumer>();// configurator.UsingInMemory((context, cfg) =>// {//cfg.ConfigureEndpoints(context);// });configurator.UsingRabbitMq((context, cfg) =>{cfg.Host(host: "localhost",port: 5672,virtualHost: "/",configure: hostConfig =>{hostConfig.Username("guest");hostConfig.Password("guest");});cfg.ConfigureEndpoints(context);});});}).Build();await host.RunAsync();
运行项目 , MassTransit会自动在指定的RabbitMQ上创建一个类型为fanout
的MassTransit.Demo.OrderCreatedEvent
Exchange和一个与OrderCreatedEvent
同名的队列进行消息传输,如下图所示 。文章插图
核心概念MassTranist 为了实现消息代理的透明化和应用间消息的高效传输,抽象了以下概念,其中消息流转流程如下图所示:
- Message:消息契约,定义了消息生产者和消息消费者之间的契约 。
- Producer:生产者,发送消息的一方都可以称为生产者 。
- SendEndpoint:发送端点,用于将消息内容序列化,并发送到传输模块 。
- Transport:传输模块,消息代理透明化的核心,用于和消息代理通信,负责发送和接收消息 。
- ReceiveEndpoint:接收端点,用于从传输模块接收消息,反序列化消息内容,并将消息路由到消费者 。
推荐阅读
- 4 .NET 6学习笔记——如何在.NET 6的Desktop App中使用Windows Runtime API
- 学习ASP.NET Core Blazor编程系列八——数据校验
- 【.NET 6】RabbitMQ延迟消费指南
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- .net core-利用PdfSharpCore和SkiaSharp.QrCode 添加PDF二维码页眉
- 云原生分布式 PostgreSQL+Citus 集群在 Sentry 后端的实践
- .net core -利用 BsonDocumentProjectionDefinition 和Lookup 进行 join 关联 MongoDB 查询
- 微服务系列之分布式日志 ELK
- .net lambda表达式合并
- .NET Core C#系列之XiaoFeng.Threading.JobScheduler作业调度