首页 程序笔记 Linux下RabbitMQ安装和.NET Core使用RabbitMQ.Client操作

Linux下RabbitMQ安装和.NET Core使用RabbitMQ.Client操作

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

Linux下RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

1.首先安装erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安装socat

yum install socat

3.最后安装RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

RabbitMQ常用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

停止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  password

rabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

.NET Core 使用RabbitMQ

通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
	UserName = "admin",//用户名
	Password = "admin",//密码
	HostName = "192.168.157.130"//rabbitmq ip
};

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//声明一个队列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!");

string input;
do
{
	input = Console.ReadLine();

	var sendBytes = Encoding.UTF8.GetBytes(input);
	//发布消息
	channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();

定义消费者

	//创建连接工厂
	        ConnectionFactory factory = new ConnectionFactory
	        {
		        UserName = "admin",//用户名
		        Password = "admin",//密码
		        HostName = "192.168.157.130"//rabbitmq ip
	        };	        //创建连接
	        var connection = factory.CreateConnection();	        //创建通道
	        var channel = connection.CreateModel();			//事件基本消费者
			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);			//接收到消息事件
	        consumer.Received += (ch, ea) =>
	        {		        var message = Encoding.UTF8.GetString(ea.Body);
		        Console.WriteLine($"收到消息: {message}");				//确认该消息已被消费
		        channel.BasicAck(ea.DeliveryTag, false);
			};			//启动消费者 设置为手动应答消息
			channel.BasicConsume("hello", false, consumer);
	        Console.WriteLine("消费者已启动");
	        Console.ReadKey();
	        channel.Dispose();
	        connection.Close();

运行效果

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

修改一下消费者的代码:

//接收到消息事件
consumer.Received += (ch, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body);

	Console.WriteLine($"收到消息: {message}");

	Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
	Thread.Sleep(10000);
	//确认该消息已被消费
	channel.BasicAck(ea.DeliveryTag, false);
	Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};

运行效果

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。

Direct Exchange

所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

//定义一个队列
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);

运行效果

Fanout Exchange

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

static void Main(string[] args)
{
	string exchangeName = "TestFanoutChange";
	string queueName1 = "hello1";
	string queueName2 = "hello2";
	string routeKey = "";

	//创建连接工厂
	ConnectionFactory factory = new ConnectionFactory
	{
		UserName = "admin",//用户名
		Password = "admin",//密码
		HostName = "192.168.157.130"//rabbitmq ip
	};

	//创建连接
	var connection = factory.CreateConnection();
	//创建通道
	var channel = connection.CreateModel();

	//定义一个Direct类型交换机
	channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

	//定义队列1
	channel.QueueDeclare(queueName1, false, false, false, null);
	//定义队列2
	channel.QueueDeclare(queueName2, false, false, false, null);

	//将队列绑定到交换机
	channel.QueueBind(queueName1, exchangeName, routeKey, null);
	channel.QueueBind(queueName2, exchangeName, routeKey, null);

	//生成两个队列的消费者
	ConsumerGenerator(queueName1);
	ConsumerGenerator(queueName2);


	Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");

	string input;
	do
	{
		input = Console.ReadLine();

		var sendBytes = Encoding.UTF8.GetBytes(input);
		//发布消息
		channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

	} while (input.Trim().ToLower() != "exit");
	channel.Close();
	connection.Close();
}

/// <summary>
/// 根据队列名称生成消费者
/// </summary>
/// <param name="queueName"></param>
static void ConsumerGenerator(string queueName)
{
	//创建连接工厂
	ConnectionFactory factory = new ConnectionFactory
	{
		UserName = "admin",//用户名
		Password = "admin",//密码
		HostName = "192.168.157.130"//rabbitmq ip
	};

	//创建连接
	var connection = factory.CreateConnection();
	//创建通道
	var channel = connection.CreateModel();

	//事件基本消费者
	EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

	//接收到消息事件
	consumer.Received += (ch, ea) =>
	{
		var message = Encoding.UTF8.GetString(ea.Body);

		Console.WriteLine($"Queue:{queueName}收到消息: {message}");
		//确认该消息已被消费
		channel.BasicAck(ea.DeliveryTag, false);
	};
	//启动消费者 设置为手动应答消息
	channel.BasicConsume(queueName, false, consumer);
	Console.WriteLine($"Queue:{queueName},消费者已启动");
}

运行效果

Topic Exchange

所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.*” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。

string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
	UserName = "admin",//用户名
	Password = "admin",//密码
	HostName = "192.168.157.130"//rabbitmq ip
};

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

//定义队列1
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);



Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");

string input;
do
{
	input = Console.ReadLine();

	var sendBytes = Encoding.UTF8.GetBytes(input);
	//发布消息
	channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

运行效果

Demo下载:DotNetCore.RabbitMQ

3

站星网

RabbitMQ简介AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准..

为您推荐

.NET Core 中替代 System.Drawing 的图像处理库:ImageSharp、SkiaSharp、Magick.NET 等对比分析

随着 .NET Core / .NET 6+ 平台对跨平台支持的加强,以及 System.Drawing.Common 在非 Windows 平台上的限制日益凸显,越来越多的开发者需要寻找合适的替代方案。微软从 .NET 6 起明确指出,System.Drawing.Common ..

Access to the path 'C:\Windows\TEMP\ASPNETCORE_xxx.tmp' is denied. 解决方法

.NET 网站上传文件时报错:System.InvalidOperationException: The exception handler configured on ExceptionHandlerOptions produced a 404 status response. This InvalidOperationException containing the ori..

如何显著提升 .NET 应用的启动速度:实用技巧与最佳实践

在现代软件环境下,用户对应用启动速度的容忍度非常低——启动过程若太慢,就可能损失首次体验和用户留存。对于 .NET 应用(包括 ASP.NET Core、桌面应用、服务程序等),启动性能优化是一项必须重视的工..

Blazor 与传统 MVC 对比详解:如何为你的 .NET 项目选择合适框架

在 .NET 世界里,Web 应用长期以来主要依靠 MVC(Model-View-Controller) 架构加上 Razor 视图渲染。但近年来随着前端交互需求增强、单页应用(SPA)趋势普及,微软推出 Blazor(支持在浏览器运行 C#)为 .NET 开发..

.NET Web API 文档库全对比:Swagger、NSwag、Scalar 选哪个?

在 .NET 生态中,Web API 已成为主流后端服务形式。对于 API 项目而言,良好的文档不仅能提升开发效率、易用性,还能支撑客户端、第三方接入、测试、运维、协作等环节。近年来,除了传统的 Swagger / Swashbuckle,..

在 ASP.NET Core 中:修改 appsettings 后程序会自动重启吗?详解与实践

在日常 ASP.NET Core 开发中,很多人疑惑:当修改 appsettings.json 或其他配置文件后,程序会自动重启吗?答案是:不一定。具体行为取决于托管环境、配置加载方式、以及代码中是否支持“热重载”或&ldquo..

如何使用 .NET 与 C# 利用 FluentFTP 库实现可靠的 FTP 文件传输

在许多企业系统与网络应用中,FTP(File Transfer Protocol)或 FTPS(FTP over SSL/TLS)仍然是文件传输的常见方案。使用标准的 FTP 客户端类固然可行,但在可靠性、可维护性与功能性上往往难以满足复杂需求。Fluen..

2025 年最新 .NET Redis 客户端库对比测评:性能、功能与适用场景解析

随着 .NET 应用对高性能分布式缓存与消息通讯需求不断提升,Redis 成为后端架构中的关键组件之一。然而,如何在 .NET 生态选择合适的 Redis 客户端库,却是一项需要深入考量的问题。本文从性能、功能扩展、安全许可..

.NET 中用 C# 构建布隆过滤器(Bloom Filter)实战教程

布隆过滤器是一种空间高效的概率型数据结构,常用于快速判断某元素绝对不存在,从而优化缓存、防止缓存穿透或数据库重复查询场景。尤其在 .NET 系统中,它能显著减少数据库或其他后端服务的压力。.NET 上常用的布隆..

git版本管理全流程命令操作

git完整使用流程:# 1. 从远程仓库拉取代码git clone <repo link># 2. 从mian分支创建其他分支git checkout -b <my_branch># 3. 查看main分支最新提交,但不合并(pull会合并merge)git fetch origin# 3. 从远程仓库..

Web前端入门第 81 问:JavaScript cookie 的读写操作

前端的 cookie 读写在 2020 年之前一直不存在一个官方的接口,每次需要使用 cookie 的时候,要么是引入三方插件,要么就需要自己封装一个公用的组件或函数。npm 的 cookie 插件周下载量 6 千万左右,可以想象一下此..

Linux 本地提权漏洞告急:CVE‑2025‑6018 & CVE‑2025‑6019 联合绕过至 root

近日,Qualys 威胁研究小组(TRU)披露了两项可链式利用的本地权限提升漏洞,首次揭示了攻击者如何从普通账户一步步直达 root 权限,对全球主流 Linux 发行版构成重大威胁。第一个漏洞编号为 CVE‑2025‑6018,存在..

.NET 10 C# 14 必知的 6 大语法糖:提升开发效率,简洁优雅

.NET 10(搭配 C# 14)正式上线,带来一批令人惊喜的语法糖改进,让日常开发变得更加简洁、高效。无论你是编写企业级系统、构建性能敏感型组件,还是编写一次性脚本,这些新语法糖都能让你的代码更具可读性、减少..

2025年最佳.NET C#实现PDF转Word:主流库功能与对比

在日常工作中,将 PDF 文件高质量地转换为 Word 文档已成为许多企业和办公人员的常见需求,尤其是在文档归档、编辑流程自动化和办公系统集成等场景中尤为重要。对于使用 .NET 平台,特别是 C# 的开发者来说,选择一..

.NET Core 图像处理:Magick.NET 与 SkiaSharp 的全面对比

随着 .NET Core 的发展,传统的 System.Drawing 库因其对 Windows 的依赖性和在跨平台应用中的限制,逐渐被其他图像处理库所取代。在众多替代方案中,Magick.NET 和 SkiaSharp 是最受欢迎的两个选择。本文将从多个维..

使用.NET C#将图片转换为.ico图标文件的多种方法

在Windows应用程序开发中,图标(.ico)文件是不可或缺的一部分。本文将介绍如何使用.NET C#将常见的图片格式(如PNG、JPG、BMP)转换为.ico文件,并提供多种实现方式,包括使用System.Drawing、Magick.NET库的方法..

RevokeMsgPatcher:.NET开源、免费的Windows下PC版微信/QQ/TIM的防撤回补丁

今天给大家分享一款基于 .NET 开源、免费的适用于 Windows 下 PC 版微信/QQ/TIM的防撤回补丁(我已经看到了,撤回也没用了),通用的微信多开工具:RevokeMsgPatcher。RevokeMsgPatcher GitHub地址:https://github...

RabbitMQ 4.0+重大更新!.NET(C#)开发者必须掌握的6大升级要点

RabbitMQ 作为一款广受欢迎的消息队列中间件,近年来从 3.x 版本升级到 4.0+,带来了显著的功能增强和架构调整。与此同时,其官方 C# 客户端也从 6.x 版本跃升至 7.0,引入了全新的编程模型和性能优化。这些变化不仅..

Mysql跨库操作

在 MySQL 中,操作多数据源(例如从库 A 和库 B)进行联查的情况,可以分为以下两种场景:A 库和 B 库在同一个 MySQL 实例当 A 库和 B 库在同一个 MySQL 实例下时,可以直接使用跨库联查查询。示例如下:SELECT A.co..

Paylinks:基于现代 .NET 的跨平台第三方支付 SDK 详解与使用示例

Paylinks 是一套基于现代 .NET 开发的,支持跨平台、多商户的第三方支付SDK。该项目旨在简化开发者接入第三方支付平台的过程,特别是针对支付宝和微信支付,便于快速集成支付功能。Paylinks 提供了丰富的配置选项和..

发表回复

返回顶部

微信分享

微信分享二维码

扫描二维码分享到微信或朋友圈

链接已复制
蜂鸟影院2048影视资源论坛熊猫影视河马影视星辰影视萝卜影院八哥电影网人人看电影无忧影视网橙子影视网叮当影视网天天影视网青青影视网电影天堂开心追剧网西瓜影院麻花影视网70影视网年钻网茶小舍电影藏影堂新神州影域煮酒观影体积影视爱看影院星光电影至尊影院极影公社超清视界