MassTransit | .NET 分布式应用框架

引言

MassTransit | .NET 分布式应用框架

文章插图
A free, open-source distributed application framework for .NET. 一个免费、开源的.NET 分布式应用框架 。-- MassTransit 官网
MassTransit,直译公共交通,是由Chris Patterson开发的基于消息驱动的.NET 分布式应用框架,其核心思想是借助消息来实现服务之间的松耦合异步通信,进而确保应用更高的可用性、可靠性和可扩展性 。通过对消息模型的高度抽象 , 以及对主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大简化了基于消息驱动的开发门槛,同时内置了连接管理、消息序列化和消费者生命周期管理 , 以及诸如重试、限流、断路器等异常处理机制,让开发者更好的专注于业务实现 。简而言之 , MassTransit实现了消息代理透明化 。无需面向消息代理编程进行诸如连接管理、队列的申明和绑定等操作 , 即可轻松实现应用间消息的传递和消费 。
快速体验空口无凭,创建一个项目快速体验一下 。
  1. 基于worker模板创建一个基础项目:dotnet new worker -n MassTransit.Demo
  2. 打开项目,添加NuGet包:MassTransit
  3. 定义订单创建事件消息契约:
using System;namespace MassTransit.Demo{public record OrderCreatedEvent{public Guid OrderId { get; set; }}}
  1. 修改Worker类,发送订单创建事件:
namespace MassTransit.Demo;public class Worker : BackgroundService{readonly IBus _bus;//注册总线public Worker(IBus bus){_bus = bus;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){//模拟并发送订单创建事件await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);await Task.Delay(1000, stoppingToken);}}}
  1. 仅需实现IConsumer<OrderCreatedEvent>泛型接口 , 即可实现消息的订阅:
public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>{private readonly ILogger<OrderCreatedEventConsumer> _logger;public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger){_logger = logger;}public Task Consume(ConsumeContext<OrderCreatedEvent> context){_logger.LogInformation($"Received Order:{context.Message.OrderId}");return Task.CompletedTask;}}
  1. 注册服务:
using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args).ConfigureServices(services =>{services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{//注册消费者configurator.AddConsumer<OrderCreatedEventConsumer>();//使用基于内存的消息路由传输configurator.UsingInMemory((context, cfg) =>{cfg.ConfigureEndpoints(context);});});}).Build();await host.RunAsync();
  1. 运行项目,一个简单的进程内事件发布订阅的应用就完成了 。
如果需要使用RabbitMQ 消息代理进行消息传输,则仅需安装MassTransit.RabbitMQNuGet包 , 然后指定使用RabbitMQ 传输消息即可 。
using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args).ConfigureServices(services =>{services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{configurator.AddConsumer<OrderCreatedEventConsumer>();// configurator.UsingInMemory((context, cfg) =>// {//cfg.ConfigureEndpoints(context);// });configurator.UsingRabbitMq((context, cfg) =>{cfg.Host(host: "localhost",port: 5672,virtualHost: "/",configure: hostConfig =>{hostConfig.Username("guest");hostConfig.Password("guest");});cfg.ConfigureEndpoints(context);});});}).Build();await host.RunAsync();运行项目 , MassTransit会自动在指定的RabbitMQ上创建一个类型为fanoutMassTransit.Demo.OrderCreatedEventExchange和一个与OrderCreatedEvent同名的队列进行消息传输,如下图所示 。
MassTransit | .NET 分布式应用框架

文章插图
核心概念MassTranist 为了实现消息代理的透明化和应用间消息的高效传输,抽象了以下概念,其中消息流转流程如下图所示:
  1. Message:消息契约,定义了消息生产者和消息消费者之间的契约 。
  2. Producer:生产者,发送消息的一方都可以称为生产者 。
  3. SendEndpoint:发送端点,用于将消息内容序列化,并发送到传输模块 。
  4. Transport:传输模块,消息代理透明化的核心,用于和消息代理通信,负责发送和接收消息 。
  5. ReceiveEndpoint:接收端点,用于从传输模块接收消息,反序列化消息内容,并将消息路由到消费者 。

    推荐阅读