首页 程序笔记 ASP.NET Core微服务架构中使用RabbitMQ实现CQRS模式

ASP.NET Core微服务架构中使用RabbitMQ实现CQRS模式

微服务架构代表了软件设计的范式转变,将大型单体应用程序分解为更小的、可管理的服务,这些服务独立运行并通过定义良好的 API 进行通信。

微服务架构概述

在 C# 中,微服务可以是更大系统的一部分:

using System;
using Microsoft.AspNetCore.Mvc;
[Route("api/[controller]")]
[ApiController]
public class UserController : ControllerBase
{
    [HttpGet]
    public ActionResult<string> GetUser()
    {
        // Logic to fetch user data from a database or external service
        return "User data";
    }
}

在此代码片段中,UserController 公开了一个 HTTP GET 终结点来检索用户数据,展示了此微服务的单一职责。

命令查询责任分离 (CQRS) 模式的说明

CQRS 从根本上将处理命令(更改系统状态)和查询(在不修改状态的情况下检索数据)的责任分开。这种隔离允许针对每种类型的操作进行优化。

// Example of Command and Query models in C#
public class Command
{
    public string Id { get; set; }
    public object Payload { get; set; }
}

public class Query
{
    public string Id { get; set; }
}
// Command Handler
public class CommandHandler
{
    public void HandleCommand(Command command)
    {
        // Logic to process and update the system state based on the command
    }
}
// Query Handler
public class QueryHandler
{
    public object HandleQuery(Query query)
    {
        // Logic to retrieve and return data without altering the system state
        return null;
    }
}

这种分为具有专用处理程序的命令和查询模型的过程简化了代码库,并实现了对写入和读取操作的定制优化。

消息代理(如 RabbitMQ)在微服务中的重要性

以 RabbitMQ 为例,消息代理在微服务架构中至关重要,为服务之间的异步通信提供了健壮的机制。它们在不同组件之间的通信中实现了解耦、可靠性和可扩展性。

// Example of using RabbitMQ with RabbitMQ.Client in C#
using RabbitMQ.Client;
class RabbitMQService
{
    public void SendMessageToQueue(string queueName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
        Console.WriteLine($"Message sent to {queueName}: {message}");
    }
}

在上述代码片段中,RabbitMQ 用于将消息发送到特定队列,从而确保微服务之间的可靠通信。

CQRS原理解释

CQRS(命令查询责任分离)是一种体系结构模式,它主张将应用程序中的读取和写入操作之间明确分离。它区分用于读取数据(查询)的模型和逻辑以及用于修改数据(命令)的模型和逻辑。

// Example of Command and Query models in C#
public class Command
{
    public string Id { get; set; }
    public object Payload { get; set; }
}
public class Query
{
    public string Id { get; set; }
}

在上述示例中,Command 和 Query 类分别表示用于处理写入和读取操作的不同模型。这种分离有助于隔离关注点并简化每种操作类型的逻辑。

分离读取和写入操作的优点

分离读取和写入操作具有以下几个优点:

优化:不同的模型可以针对其特定任务进行优化。例如,可以定制查询模型以实现快速读取。 可扩展性:系统可以独立扩展读取和写入,从而优化性能。 灵活性:修改写入逻辑不会影响读取操作,反之亦然,从而在设计和演进方面提供更大的灵活性。
// Command Handler
public class CommandHandler
{
    public void HandleCommand(Command command)
    {
        // Logic to process and update the system state based on the command
    }
}

// Query Handler
public class QueryHandler
{
    public object HandleQuery(Query query)
    {
        // Logic to retrieve and return data without altering the system state
        return null;
    }
}

将命令和查询的处理分开,可以根据每个操作的特定要求定制专用逻辑。

CQRS 大放异彩的用例和场景

CQRS 在以下情况下特别有用:

复杂域:处理复杂业务逻辑的系统受益于 CQRS 提供的关注点分离。 性能优化:需要针对高吞吐量读取和写入操作进行优化的应用程序。 事件溯源:CQRS 通过提供单独的模型来处理基于事件的命令和查询,从而补充事件溯源。 事件溯源:CQRS 通过提供单独的模型来处理基于事件的命令和查询,从而补充事件溯源。
// Example of using CQRS with Event Sourcing
public class EventSourcingHandler
{
    public void ApplyEvent(Event event)
    {
        // Logic to apply an event and update the system state
    }
    public object GetState()
    {
        // Logic to reconstruct the system state based on events for query purposes
        return null;
    }
}

通过将 CQRS 与事件溯源结合使用,应用程序可以在处理命令/事件和查询数据之间保持明确的分离。

RabbitMQ 作为消息代理的概述

RabbitMQ 是一个健壮的开源消息代理,可促进分布式应用程序之间的通信。它充当中介,使应用程序的各个组件能够无缝通信和传输数据。

// Example of using RabbitMQ with RabbitMQ.Client in C#
using RabbitMQ.Client;
class RabbitMQService
{
    public void SendMessageToQueue(string queueName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
        Console.WriteLine($"Message sent to {queueName}: {message}");
    }
}

在上面的代码中,描述了一个类,展示了如何使用 RabbitMQ 的客户端库将消息发送到特定队列。

微服务架构的主要特性和优势

RabbitMQ 提供了几个特性,使其非常适合微服务架构:

可靠性:它确保消息传递并支持消息确认机制。 灵活性:支持各种消息传递模式(发布/订阅、点对点)和协议(AMQP、MQTT)。 可扩展性:通过在多个节点或集群之间分发消息,允许水平扩展。
// Example of using RabbitMQ for Publish-Subscribe
public class Publisher
{
    public void Publish(string exchangeName, string message)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
        Console.WriteLine($"Message published to {exchangeName}: {message}");
    }
}

该上述代码中通过向交易所发布消息来演示 RabbitMQ 的发布-订阅功能。

RabbitMQ 如何促进异步通信?

RabbitMQ 通过解耦发送方和接收方组件来促进异步通信,允许它们独立运行。它通过消息队列实现这一点,确保消息在应用程序的不同部分之间可靠地传递。

// Example of consuming messages from a RabbitMQ queue
class Consumer
{
    public void ConsumeFromQueue(string queueName)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine($"Message received from {queueName}: {message}");
        };
        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    }
}

该类演示如何使用来自特定队列的消息,从而允许组件异步处理收到的消息。

将 RabbitMQ 与 CQRS 集成

将 CQRS 与 RabbitMQ 结合使用时的设计注意事项

将 CQRS 与 RabbitMQ 集成时,应考虑以下几个设计注意事项:

消息结构:以清晰、一致的格式设计命令和事件的消息。 错误处理:实施用于处理消息处理中的错误和重试的策略。 消息持久性: 配置队列以确保消息持久性,以避免数据丢失。 可扩展性:通过考虑 RabbitMQ 集群和负载均衡来规划可伸缩性。

使用 RabbitMQ 作为消息传递主干的命令处理

使用 RabbitMQ 作为命令处理的消息传递主干涉及将命令发送到队列,这些命令将由处理程序使用进行处理。

如在一个在线订购系统的场景中,将 RabbitMQ 与 C# 中的 CQRS 集成以异步处理订单:

场景:

在在线订购系统中,当下达新订单时,需要异步处理。我们将使用 RabbitMQ 来处理命令(下订单)和事件(订单处理)。系统将按照 CQRS 原则使用队列分离命令和事件。

设计注意事项:

订单命令:表示用于下订单的命令。 订单事件:表示已处理订单的事件。 错误处理:对失败的订单实施重试机制。

命令处理:

public class OrderCommandHandler
{
    private readonly string commandQueueName = "order_commands";

    public void SendOrderCommand(OrderCommand command)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(command));
        channel.BasicPublish(exchange: "", routingKey: commandQueueName, basicProperties: null, body: body);
        Console.WriteLine($"Order command sent: {JsonConvert.SerializeObject(command)}");
    }
    public void ConsumeOrderCommands()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var commandMessage = Encoding.UTF8.GetString(body);
            var orderCommand = JsonConvert.DeserializeObject<OrderCommand>(commandMessage);

            // Process the order command
            Task.Run(() => ProcessOrderCommand(orderCommand));

            // Acknowledge the message
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue: commandQueueName, autoAck: false, consumer: consumer);
    }
    private void ProcessOrderCommand(OrderCommand orderCommand)
    {
        // Logic to process the order command asynchronously
        Console.WriteLine($"Processing order command: {JsonConvert.SerializeObject(orderCommand)}");
        
        // Place order, perform validation, etc.
        // If successful, publish an order processed event
        var orderEvent = new OrderEvent { OrderId = orderCommand.OrderId, Status = "Processed" };
        SendOrderProcessedEvent(orderEvent);
    }
    private void SendOrderProcessedEvent(OrderEvent orderEvent)
    {
        var eventQueueName = "order_events";
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderEvent));
        channel.BasicPublish(exchange: "", routingKey: eventQueueName, basicProperties: null, body: body);
        Console.WriteLine($"Order processed event sent: {JsonConvert.SerializeObject(orderEvent)}");
    }
}

实现命令和事件的消息队列

在集成了 RabbitMQ 的基于 CQRS 的系统中,为命令和事件建立了单独的队列,以实现组件之间的异步通信。

public class OrderEventConsumer
{
    private readonly string eventQueueName = "order_events";

    public void ConsumeOrderEvents()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var eventMessage = Encoding.UTF8.GetString(body);
            var orderEvent = JsonConvert.DeserializeObject<OrderEvent>(eventMessage);
            Console.WriteLine($"Received order processed event: {JsonConvert.SerializeObject(orderEvent)}");
            // Logic to handle the processed order event
        };
        channel.BasicConsume(queue: eventQueueName, autoAck: true, consumer: consumer);
    }
}

异步通信和事件驱动架构

RabbitMQ 允许组件以非阻塞方式对事件和消息做出反应,从而促进事件驱动架构中的异步通信。

public class Program
{
    public static void Main(string[] args)
    {
        var orderCommandHandler = new OrderCommandHandler();
        var orderEventConsumer = new OrderEventConsumer();

        // Example: Sending an order command
        var orderCommand = new OrderCommand { OrderId = Guid.NewGuid(), Product = "Product A", Quantity = 2 };
        orderCommandHandler.SendOrderCommand(orderCommand);

        // Consume order commands and events asynchronously
        Task.Run(() => orderCommandHandler.ConsumeOrderCommands());
        Task.Run(() => orderEventConsumer.ConsumeOrderEvents());
        Console.ReadLine(); // Keep the application running
    }
}

在微服务中使用 RabbitMQ 实现 CQRS

设置微服务基础架构

为简单起见,我们创建两个微服务:一个用于处理命令 (OrderCommandService),另一个用于处理查询 (OrderQueryService)。每个服务都将处理 CQRS 模式的特定方面。

订单命令服务

// OrderCommandService: Handles commands (placing orders)
public class OrderCommandService
{
    private readonly string commandQueueName = "order_commands";
    public void SendOrderCommand(OrderCommand command)
    {
        // Code to send order command to RabbitMQ queue (similar to previously shown CommandSender)
    }
    public void ConsumeOrderCommands()
    {
        // Code to consume order commands from RabbitMQ queue (similar to previously shown CommandConsumer)
        // Process received commands asynchronously and trigger events accordingly
    }
}

订单查询服务

// OrderQueryService: Handles queries (fetching orders)
public class OrderQueryService
{
    private readonly string queryQueueName = "order_queries";
    public void SendOrderQuery(Query query)
    {
        // Code to send order query to RabbitMQ queue (similar to previously shown CommandSender)
    }
    public void ConsumeOrderQueries()
    {
        // Code to consume order queries from RabbitMQ queue (similar to previously shown CommandConsumer)
        // Process received queries and retrieve orders data asynchronously
    }
}

在微服务中定义命令和查询模型

命令和查询模型

// Command model
public class OrderCommand
{
    public string OrderId { get; set; }
    // Other order-related properties...
}
// Query model
public class OrderQuery
{
    public string QueryId { get; set; }
    // Other query-related properties...
}

使用 RabbitMQ 编写命令、处理事件和查询

订单命令服务

// Sending order commands
OrderCommandService orderCommandService = new OrderCommandService();
OrderCommand orderCommand = new OrderCommand { OrderId = "123", /* Other order properties */ };
orderCommandService.SendOrderCommand(orderCommand);
// Consuming order commands
orderCommandService.ConsumeOrderCommands();

订单查询服务

// Sending order queries
OrderQueryService orderQueryService = new OrderQueryService();
OrderQuery orderQuery = new OrderQuery { QueryId = "456", /* Other query properties */ };
orderQueryService.SendOrderQuery(orderQuery);
// Consuming order queries
orderQueryService.ConsumeOrderQueries();

确保数据一致性和最终一致性

实现数据一致性和最终一致性将涉及其他步骤,例如使用事件溯源、使用一致的读取模型进行查询,以及确保正确处理微服务中的事件。

在此只是演示了如何使用 CQRS 和 RabbitMQ 的在线订购系统微服务的基本设置,概述了用于处理命令和查询的服务之间的结构和交互。在实际应用场景中,还要实现完全数据一致性和最终一致性,通常需要对事件、数据存储机制和错误恢复策略进行更复杂的处理。

在微服务架构中使用 RabbitMQ 实现 CQRS 提供了一种强大的方法来构建可扩展的解耦系统,从而高效处理复杂的操作。

2

站星网

微服务架构代表了软件设计的范式转变,将大型单体应用程序分解为更小的、可管理的服务,这些服务独立运行并..

为您推荐

夜莺监控设计思考(一)整体定位、架构设计、单进程多进程选择、高可用设计

这将是一个系列,讲解 夜莺监控 的设计思考,可以理解为原理+最佳实践+产品设计时的折中取舍。整体定位了解一个开源项目,最应该了解的就是其定位,或者说它要解决的问题域。夜莺的定位就是四个字:告警引擎。夜莺对..

.NET Core 中替代 System.Drawing 的图像处理库:ImageSharp、SkiaSharp、Magick.NET 等对比分析

随着 .NET Core / .NET 6+ 平台对跨平台支持的加强,以及 System.Drawing.Common 在非 Windows 平台上的限制日益凸显,越来越多的开发者需要寻找合适的替代方案。微软从 .NET 6 起明确指出,System.Drawing.Common ..

Access to the path 'C:\Windows\TEMP\ASPNETCORE_xxx.tmp' is denied. 解决方法

.NET 网站上传文件时报错:System.InvalidOperationException: The exception handler configured on ExceptionHandlerOptions produced a 404 status response. This InvalidOperationException containing the ori..

如何显著提升 .NET 应用的启动速度:实用技巧与最佳实践

在现代软件环境下,用户对应用启动速度的容忍度非常低——启动过程若太慢,就可能损失首次体验和用户留存。对于 .NET 应用(包括 ASP.NET Core、桌面应用、服务程序等),启动性能优化是一项必须重视的工..

Blazor 与传统 MVC 对比详解:如何为你的 .NET 项目选择合适框架

在 .NET 世界里,Web 应用长期以来主要依靠 MVC(Model-View-Controller) 架构加上 Razor 视图渲染。但近年来随着前端交互需求增强、单页应用(SPA)趋势普及,微软推出 Blazor(支持在浏览器运行 C#)为 .NET 开发..

.NET Web API 文档库全对比:Swagger、NSwag、Scalar 选哪个?

在 .NET 生态中,Web API 已成为主流后端服务形式。对于 API 项目而言,良好的文档不仅能提升开发效率、易用性,还能支撑客户端、第三方接入、测试、运维、协作等环节。近年来,除了传统的 Swagger / Swashbuckle,..

在 ASP.NET Core 中:修改 appsettings 后程序会自动重启吗?详解与实践

在日常 ASP.NET Core 开发中,很多人疑惑:当修改 appsettings.json 或其他配置文件后,程序会自动重启吗?答案是:不一定。具体行为取决于托管环境、配置加载方式、以及代码中是否支持“热重载”或&ldquo..

TikTok Shop黑五备战枪响,卖家开启“赶场”模式

来源:TT123跨境电商作者:TT123跨境电商2007 年,化身牛仔的周杰伦很忙,2025 年,在跨境电商平台大促之间的反复横跳的跨境卖家,更忙。随着 Temu、TikTok Shop、亚马逊等多家跨境平台官宣年终大促的节奏,卖家们日..

如何使用 .NET 与 C# 利用 FluentFTP 库实现可靠的 FTP 文件传输

在许多企业系统与网络应用中,FTP(File Transfer Protocol)或 FTPS(FTP over SSL/TLS)仍然是文件传输的常见方案。使用标准的 FTP 客户端类固然可行,但在可靠性、可维护性与功能性上往往难以满足复杂需求。Fluen..

2025 年最新 .NET Redis 客户端库对比测评:性能、功能与适用场景解析

随着 .NET 应用对高性能分布式缓存与消息通讯需求不断提升,Redis 成为后端架构中的关键组件之一。然而,如何在 .NET 生态选择合适的 Redis 客户端库,却是一项需要深入考量的问题。本文从性能、功能扩展、安全许可..

.NET 中用 C# 构建布隆过滤器(Bloom Filter)实战教程

布隆过滤器是一种空间高效的概率型数据结构,常用于快速判断某元素绝对不存在,从而优化缓存、防止缓存穿透或数据库重复查询场景。尤其在 .NET 系统中,它能显著减少数据库或其他后端服务的压力。.NET 上常用的布隆..

大动作!京东推出半托管模式,带1000个品牌出海!

来源:跨境电商头条作者:Joey最近,外卖市场格外热闹,许多朋友直呼免费奶茶喝不动了,而在其中主导这次外卖大战的京东,又刷屏了最近的跨境电商圈。这个曾在出海路上“起大早赶晚集”的巨头,7 月正式推出海外仓半..

微服务架构学习与思考:SOA架构与微服务架构对比分析?它们之间区别是什么?

我现在把微服务架构所有的博客文章也发布到了 github 上,便于阅读(左边栏打开可以看到全部的标题),还有历史修改追踪。当然也希望大家能点个✨ 星 star 鼓励鼓励。什么是 SOA 架构#SOA(Service-Oriented Architect..

.NET 10 C# 14 必知的 6 大语法糖:提升开发效率,简洁优雅

.NET 10(搭配 C# 14)正式上线,带来一批令人惊喜的语法糖改进,让日常开发变得更加简洁、高效。无论你是编写企业级系统、构建性能敏感型组件,还是编写一次性脚本,这些新语法糖都能让你的代码更具可读性、减少..

2025年最佳.NET C#实现PDF转Word:主流库功能与对比

在日常工作中,将 PDF 文件高质量地转换为 Word 文档已成为许多企业和办公人员的常见需求,尤其是在文档归档、编辑流程自动化和办公系统集成等场景中尤为重要。对于使用 .NET 平台,特别是 C# 的开发者来说,选择一..

.NET Core 图像处理:Magick.NET 与 SkiaSharp 的全面对比

随着 .NET Core 的发展,传统的 System.Drawing 库因其对 Windows 的依赖性和在跨平台应用中的限制,逐渐被其他图像处理库所取代。在众多替代方案中,Magick.NET 和 SkiaSharp 是最受欢迎的两个选择。本文将从多个维..

使用.NET C#将图片转换为.ico图标文件的多种方法

在Windows应用程序开发中,图标(.ico)文件是不可或缺的一部分。本文将介绍如何使用.NET C#将常见的图片格式(如PNG、JPG、BMP)转换为.ico文件,并提供多种实现方式,包括使用System.Drawing、Magick.NET库的方法..

RevokeMsgPatcher:.NET开源、免费的Windows下PC版微信/QQ/TIM的防撤回补丁

今天给大家分享一款基于 .NET 开源、免费的适用于 Windows 下 PC 版微信/QQ/TIM的防撤回补丁(我已经看到了,撤回也没用了),通用的微信多开工具:RevokeMsgPatcher。RevokeMsgPatcher GitHub地址:https://github...

RabbitMQ 4.0+重大更新!.NET(C#)开发者必须掌握的6大升级要点

RabbitMQ 作为一款广受欢迎的消息队列中间件,近年来从 3.x 版本升级到 4.0+,带来了显著的功能增强和架构调整。与此同时,其官方 C# 客户端也从 6.x 版本跃升至 7.0,引入了全新的编程模型和性能优化。这些变化不仅..

Web前端入门第 55 问:JavaScript 严格模式与非严格模式区别

JavaScript 默认是非严格模式的,可以通过 "use strict"; 启用严格模式。此声明语句可以放在 JS 文件顶部,也可以放在函数内部。启用严格模式1、外部脚本在 JS 文件开头声明,内部脚本在 <script> 标签开头声明,声..

发表回复

返回顶部

微信分享

微信分享二维码

扫描二维码分享到微信或朋友圈

链接已复制
蜂鸟影院2048影视资源论坛熊猫影视河马影视星辰影视萝卜影院八哥电影网人人看电影无忧影视网橙子影视网叮当影视网天天影视网青青影视网电影天堂开心追剧网西瓜影院麻花影视网70影视网年钻网茶小舍电影藏影堂新神州影域煮酒观影体积影视爱看影院星光电影至尊影院极影公社超清视界