首页 程序笔记 Linux下RabbitMQ安装和.NET Core使用RabbitMQ.Client操作

Linux下RabbitMQ安装和.NET Core使用RabbitMQ.Client操作

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

Linux下RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

1.首先安装erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安装socat

yum install socat

3.最后安装RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

RabbitMQ常用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

停止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  password

rabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

.NET Core 使用RabbitMQ

通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
	UserName = "admin",//用户名
	Password = "admin",//密码
	HostName = "192.168.157.130"//rabbitmq ip
};

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//声明一个队列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!");

string input;
do
{
	input = Console.ReadLine();

	var sendBytes = Encoding.UTF8.GetBytes(input);
	//发布消息
	channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();

定义消费者

	//创建连接工厂
	        ConnectionFactory factory = new ConnectionFactory
	        {
		        UserName = "admin",//用户名
		        Password = "admin",//密码
		        HostName = "192.168.157.130"//rabbitmq ip
	        };	        //创建连接
	        var connection = factory.CreateConnection();	        //创建通道
	        var channel = connection.CreateModel();			//事件基本消费者
			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);			//接收到消息事件
	        consumer.Received += (ch, ea) =>
	        {		        var message = Encoding.UTF8.GetString(ea.Body);
		        Console.WriteLine($"收到消息: {message}");				//确认该消息已被消费
		        channel.BasicAck(ea.DeliveryTag, false);
			};			//启动消费者 设置为手动应答消息
			channel.BasicConsume("hello", false, consumer);
	        Console.WriteLine("消费者已启动");
	        Console.ReadKey();
	        channel.Dispose();
	        connection.Close();

运行效果

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

修改一下消费者的代码:

//接收到消息事件
consumer.Received += (ch, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body);

	Console.WriteLine($"收到消息: {message}");

	Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
	Thread.Sleep(10000);
	//确认该消息已被消费
	channel.BasicAck(ea.DeliveryTag, false);
	Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};

运行效果

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。

Direct Exchange

所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

//定义一个队列
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);

运行效果

Fanout Exchange

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

static void Main(string[] args)
{
	string exchangeName = "TestFanoutChange";
	string queueName1 = "hello1";
	string queueName2 = "hello2";
	string routeKey = "";

	//创建连接工厂
	ConnectionFactory factory = new ConnectionFactory
	{
		UserName = "admin",//用户名
		Password = "admin",//密码
		HostName = "192.168.157.130"//rabbitmq ip
	};

	//创建连接
	var connection = factory.CreateConnection();
	//创建通道
	var channel = connection.CreateModel();

	//定义一个Direct类型交换机
	channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

	//定义队列1
	channel.QueueDeclare(queueName1, false, false, false, null);
	//定义队列2
	channel.QueueDeclare(queueName2, false, false, false, null);

	//将队列绑定到交换机
	channel.QueueBind(queueName1, exchangeName, routeKey, null);
	channel.QueueBind(queueName2, exchangeName, routeKey, null);

	//生成两个队列的消费者
	ConsumerGenerator(queueName1);
	ConsumerGenerator(queueName2);


	Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");

	string input;
	do
	{
		input = Console.ReadLine();

		var sendBytes = Encoding.UTF8.GetBytes(input);
		//发布消息
		channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

	} while (input.Trim().ToLower() != "exit");
	channel.Close();
	connection.Close();
}

/// <summary>
/// 根据队列名称生成消费者
/// </summary>
/// <param name="queueName"></param>
static void ConsumerGenerator(string queueName)
{
	//创建连接工厂
	ConnectionFactory factory = new ConnectionFactory
	{
		UserName = "admin",//用户名
		Password = "admin",//密码
		HostName = "192.168.157.130"//rabbitmq ip
	};

	//创建连接
	var connection = factory.CreateConnection();
	//创建通道
	var channel = connection.CreateModel();

	//事件基本消费者
	EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

	//接收到消息事件
	consumer.Received += (ch, ea) =>
	{
		var message = Encoding.UTF8.GetString(ea.Body);

		Console.WriteLine($"Queue:{queueName}收到消息: {message}");
		//确认该消息已被消费
		channel.BasicAck(ea.DeliveryTag, false);
	};
	//启动消费者 设置为手动应答消息
	channel.BasicConsume(queueName, false, consumer);
	Console.WriteLine($"Queue:{queueName},消费者已启动");
}

运行效果

Topic Exchange

所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.*” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。

string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
	UserName = "admin",//用户名
	Password = "admin",//密码
	HostName = "192.168.157.130"//rabbitmq ip
};

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

//定义队列1
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);



Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");

string input;
do
{
	input = Console.ReadLine();

	var sendBytes = Encoding.UTF8.GetBytes(input);
	//发布消息
	channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

运行效果

Demo下载:DotNetCore.RabbitMQ

3

站心网

RabbitMQ简介AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准..

为您推荐

.NET 10 C# 14 必知的 6 大语法糖:提升开发效率,简洁优雅

.NET 10(搭配 C# 14)正式上线,带来一批令人惊喜的语法糖改进,让日常开发变得更加简洁、高效。无论你是编写企业级系统、构建性能敏感型组件,还是编写一次性脚本,这些新语法糖都能让你的代码更具可读性、减少..

2025年最佳.NET C#实现PDF转Word:主流库功能与对比

在日常工作中,将 PDF 文件高质量地转换为 Word 文档已成为许多企业和办公人员的常见需求,尤其是在文档归档、编辑流程自动化和办公系统集成等场景中尤为重要。对于使用 .NET 平台,特别是 C# 的开发者来说,选择一..

.NET Core 图像处理:Magick.NET 与 SkiaSharp 的全面对比

随着 .NET Core 的发展,传统的 System.Drawing 库因其对 Windows 的依赖性和在跨平台应用中的限制,逐渐被其他图像处理库所取代。在众多替代方案中,Magick.NET 和 SkiaSharp 是最受欢迎的两个选择。本文将从多个维..

使用.NET C#将图片转换为.ico图标文件的多种方法

在Windows应用程序开发中,图标(.ico)文件是不可或缺的一部分。本文将介绍如何使用.NET C#将常见的图片格式(如PNG、JPG、BMP)转换为.ico文件,并提供多种实现方式,包括使用System.Drawing、Magick.NET库的方法..

RevokeMsgPatcher:.NET开源、免费的Windows下PC版微信/QQ/TIM的防撤回补丁

今天给大家分享一款基于 .NET 开源、免费的适用于 Windows 下 PC 版微信/QQ/TIM的防撤回补丁(我已经看到了,撤回也没用了),通用的微信多开工具:RevokeMsgPatcher。RevokeMsgPatcher GitHub地址:https://github...

RabbitMQ 4.0+重大更新!.NET(C#)开发者必须掌握的6大升级要点

RabbitMQ 作为一款广受欢迎的消息队列中间件,近年来从 3.x 版本升级到 4.0+,带来了显著的功能增强和架构调整。与此同时,其官方 C# 客户端也从 6.x 版本跃升至 7.0,引入了全新的编程模型和性能优化。这些变化不仅..

Mysql跨库操作

在 MySQL 中,操作多数据源(例如从库 A 和库 B)进行联查的情况,可以分为以下两种场景:A 库和 B 库在同一个 MySQL 实例当 A 库和 B 库在同一个 MySQL 实例下时,可以直接使用跨库联查查询。示例如下:SELECT A.co..

Paylinks:基于现代 .NET 的跨平台第三方支付 SDK 详解与使用示例

Paylinks 是一套基于现代 .NET 开发的,支持跨平台、多商户的第三方支付SDK。该项目旨在简化开发者接入第三方支付平台的过程,特别是针对支付宝和微信支付,便于快速集成支付功能。Paylinks 提供了丰富的配置选项和..

.NET 使用 Qdrant.Client 连接向量数据库 Qdrant 的完整指南

随着向量数据库在 AI、搜索、推荐系统等领域的广泛应用,越来越多的开发者开始将 Qdrant 集成到自己的项目中。对于 .NET 开发者而言,使用 Qdrant.Client 实现与 Qdrant 的高效连接和数据操作,是构建语义搜索和嵌入..

Entity Framework(EF) Core 10新特性全面解析:提升开发效率的关键更新​

Entity Framework Core(EF Core)作为 .NET 平台的主流对象关系映射(ORM)框架,持续为开发者提供高效、灵活的数据访问解决方案。​在最新发布的 EF Core 10 中,微软引入了多项新特性,旨在简化数据库操作,提升..

.NET(C#)使用 iText7 高效处理PDF文件的全面指南​

在现代软件开发中,PDF 文件处理是一个常见且重要的需求。无论是生成报告、填充表单、添加水印,还是进行数字签名,选择一个功能强大的 PDF 库至关重要。iText7 作为一款开源且功能丰富的 PDF 操作库,广泛应用于 C#..

.NET Exception: Received an unexpected EOF or 0 bytes from the transport stream.解决方法

在 .NET 应用中试用HttpClient调用API异常报错“Received an unexpected EOF or 0 bytes from the transport stream,通常表示在进行 HTTPS 通信时,SSL/TLS 握手未能成功完成,导致连接被意外关闭。​以下是一..

微软退出中国对.NET开发人员有什么影响?

关于微软将停止在中国运营的报道,微软中国方面已明确表示该信息不实。网传邮件截图显示,“由于地缘政治及国际业务环境的变化,微软将调整其全球战略布局,并将于2025年4月8日起正式停止在中国区的运营”..

EasyCaching:一款灵活高效的 .NET 缓存库

EasyCaching 项目简介EasyCaching 是一个开源的 .NET 缓存抽象库,由 DotNetCore 团队开发,旨在为 .NET 应用提供简单、统一、强大且可扩展的缓存解决方案。它支持内存缓存(In-Memory)、Redis、Memcached、LiteDB..

.NET 依赖注入如何一个接口注册两种实现

在.NET的依赖注入(Dependency Injection,DI)系统中,一个接口注册两种或多种实现是常见的需求,尤其是在需要根据不同场景或条件选择不同实现时。以下是一些实现方法:1. 使用 IEnumerable<T> 解析所有实现这是最..

.NET C# 过滤从富文本编辑器html里的Javascript脚本

富文本编辑器在允许用户输入丰富内容的同时,也带来了跨站脚本攻击(XSS)的风险。过滤提交的 HTML 中的 <script> 脚本是防止跨站脚本攻击(XSS)的关键步骤。在 .NET C# 服务端过滤 <script> 脚本主要有以下几种方..

ZLinq:.NET 高性能 LINQ 替代方案及其使用指南

在 .NET 开发中,LINQ(Language Integrated Query)为数据查询提供了简洁且强大的语法。然而,传统的 LINQ 在处理大量数据时可能会引发性能瓶颈,主要由于频繁的内存分配和对象创建。为解决这一问题,Cysharp 团队..

.NET使用AutoMapper简化对象映射

在.NET软件开发中,常常需要将一个对象的数据转换并映射到另一个对象上。​这种手动映射的过程既繁琐又容易出错,影响开发效率和代码可维护性。​为了解决这一问题,AutoMapper应运而生。​什么是 AutoMapper?AutoM..

.NET C# RESTful API交互Refit库使用教程

Refit 是一个 .NET C# 库,它简化了与 RESTful API 的交互。Refit 受到 Square 的 Retrofit 库的启发,它将 REST API 转换为实时接口,允许你以声明方式定义 REST API 调用。Refit 的特点1. 声明式 API 定义:Refit ..

.NET C# System.Text.Json进阶使用技巧

System.Text.Json 是 .NET 中用于处理 JSON 数据的强大库。除了基本用法外,它还提供了许多进阶技巧,可以帮助你更高效、更灵活地处理 JSON 数据。以下是一些 System.Text.Json 的进阶使用技巧:1. 自定义序列化和反..

发表回复

返回顶部

微信分享

微信分享二维码

扫描二维码分享到微信或朋友圈

链接已复制