登录/注册
梧桐雨
2256
占位
1
占位
0
浏览量
占位
粉丝
占位
关注
十五、.net core(.NET 6)搭建RabbitMQ消息队列生产者和消费者的简单方法
梧桐雨
2021-06-19 15:22:42 2021-06-19
113
0

 搭建RabbitMQ简单通用的直连方法

 

如果还没有

MQ

环境,可以参考上一篇的博客

https://www.cnblogs.com/weskynet/p/14877932.html

 

接下来开始

.net core

操作

Rabbitmq

有关的内容。我打算使用比较简单的单机的

direct

直连模式,来演示一下有关操作,基本套路差不多。

首先,我在我的

package

包项目上面,添加对

RabbitMQ.Client

的引用:

 

 

Common

文件夹下,新建类库项目 

Wsk.Core.RabbitMQ,

并且引用

package

项目:

 

 

在启动项目下的

appsettings

配置文件里面,新增一个访问

RabbitMQ

的配置信息:

 

 

配置部分代码:

"MQ": [
{
"Host": "127.0.0.1", // MQ安装的实际服务器IP地址
"Port": 5672, // 服务端口号
"User": "wesky", // 用户名
"Password": "wesky123", // 密码
"ExchangeName": "WeskyExchange", // 设定一个Exchange名称,
"Durable": true // 是否启用持久化
}
]

 

然后,在实体类项目下,新建实体类

MqConfigInfo

,用于把读取的配置信息赋值到该实体类下:

 

实体类代码:

public class MqConfigInfo
{
public string Host { get; set; }
public int Port { get; set; }
public string User { get; set; }
public string Password { get; set; }
public string ExchangeName { get; set; }
public bool Durable { get; set; }
}

View Code

 

在刚刚新建的

RabbitMQ

类库项目下面,引用该实体类库项目,以及

APppSettings

项目。然后新建一个类,叫做

ReadMqConfigHelper

,以及它的

interface

接口,并且提供一个方法,叫

ReadMqConfig

,用来进行读取配置信息使用:

读取配置信息类代码:

public class ReadMqConfigHelper:IReadMqConfigHelper
{
private readonly ILogger<ReadMqConfigHelper> _logger;
public ReadMqConfigHelper(ILogger<ReadMqConfigHelper> logger)
{
_logger = logger;
}
public List<MqConfigInfo> ReadMqConfig()
{
try
{
List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 读取MQ配置信息
if (config.Any())
{
return config;
}
_logger.LogError($"获取MQ配置信息失败:没有可用数据集");
return null;
}
catch (Exception ex)
{
_logger.LogError($"获取MQ配置信息失败:{ex.Message}");
return null;
}
}
}

View Code

 

接着,新建类

MqConnectionHelper

以及接口

IMqConnectionHelper

,用于做

MQ

连接、创建生产者和消费者等有关操作

:

 

 

然后,新增一系列创建连接所需要的静态变量:

 

 

然后,设置两个消费者队列,用来测试。以及添加生产者连接有关的配置和操作:

 

 

然后,创建消费者连接方法:

 

 

其中,

StartListener

下面提供了事件,用于手动确认消息接收。如果设置为自动,有可能导致消息丢失:

 

 

然后,添加消息发布方法:

 

 

interface

接口里面,添加有关的接口,用于等下依赖注入使用:

 

 

连接类部分的代码:

public class MqConnectionHelper:IMqConnectionHelper
{
private readonly ILogger<MqConnectionHelper> _logger;
public MqConnectionHelper(ILogger<MqConnectionHelper> logger)
{
_logger = logger;
_connectionReceiveFactory = new IConnectionFactory[_costomerCount];
_connectionReceive = new IConnection[_costomerCount];
_modelReceive = new IModel[_costomerCount];
_basicConsumer = new EventingBasicConsumer[_costomerCount];
}
/*
备注:使用数组的部分,是给消费端用的。目前生产者只设置了一个,消费者可能存在多个。
当然,有条件的还可以上RabbitMQ集群进行处理,会更好玩一点。
*/
private static IConnectionFactory _connectionSendFactory; //RabbitMQ工厂 发送端
private static IConnectionFactory[] _connectionReceiveFactory; //RabbitMQ工厂 接收端
private static IConnection _connectionSend; //连接 发送端
private static IConnection[] _connectionReceive; //连接 消费端
public static List<MqConfigInfo> _mqConfig; // 配置信息
private static IModel _modelSend; //通道 发送端
private static IModel[] _modelReceive; //通道 消费端
private static EventingBasicConsumer[] _basicConsumer; // 事件
/* 设置两个routingKey 和 队列名称,用来做测试使用*/
public static int _costomerCount = 2;
public static string[] _routingKey = new string[] {"WeskyNet001","WeskyNet002" };
public static string[] _queueName = new string[] { "Queue001", "Queue002" };
/// <summary>
/// 生产者初始化连接配置
/// </summary>
public void SendFactoryConnectionInit()
{
_connectionSendFactory = new ConnectionFactory
{
HostName = _mqConfig.FirstOrDefault().Host,
Port = _mqConfig.FirstOrDefault().Port,
UserName = _mqConfig.FirstOrDefault().User,
Password = _mqConfig.FirstOrDefault().Password
};
}
/// <summary>
/// 生产者连接
/// </summary>
public void SendFactoryConnection()
{
if (null != _connectionSend && _connectionSend.IsOpen)
{
return; // 已有连接
}
_connectionSend = _connectionSendFactory.CreateConnection(); // 创建生产者连接
if (null != _modelSend && _modelSend.IsOpen)
{
return; // 已有通道
}
_modelSend = _connectionSend.CreateModel(); // 创建生产者通道
_modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); // 定义交换机名称和类型(direct)
}
/// <summary>
/// 消费者初始化连接配置
/// </summary>
public void ReceiveFactoryConnectionInit()
{
var factories = new ConnectionFactory
{
HostName = _mqConfig.FirstOrDefault().Host,
Port = _mqConfig.FirstOrDefault().Port,
UserName = _mqConfig.FirstOrDefault().User,
Password = _mqConfig.FirstOrDefault().Password
};
for (int i = 0; i < _costomerCount; i++)
{
_connectionReceiveFactory[i] = factories; // 给每个消费者绑定一个连接工厂
}
}
/// <summary>
/// 消费者连接
/// </summary>
/// <param name="consumeIndex"></param>
/// <param name="exchangeName"></param>
/// <param name="routeKey"></param>
/// <param name="queueName"></param>
public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName)
{
_logger.LogInformation($"开始连接RabbitMQ消费者:{routeKey}");
if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen)
{
return;
}
_connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); // 创建消费者连接
if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen)
{
return;
}
_modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel(); // 创建消费者通道
_basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]);
_modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); // 定义交换机名称和类型 与生产者保持一致
_modelReceive[consumeIndex].QueueDeclare(
queue: queueName, //消息队列名称
durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此处配置在文件中,默认全局持久化(true),也可以自定义更改
exclusive: false,
autoDelete: false,
arguments: null
); // 定义消费者队列
_modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); // 队列绑定给指定的交换机
_modelReceive[consumeIndex].BasicQos(0, 1, false); // 设置消费者每次只接收一条消息
StartListener((model, ea) =>
{
byte[] message = ea.Body.ToArray(); // 接收到的消息
string msg = Encoding.UTF8.GetString(message);
_logger.LogInformation($"队列{queueName}接收到消息:{msg}");
Thread.Sleep(2000);
_modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);
}, queueName, consumeIndex);
}
/// <summary>
/// 消费者接收消息的确认机制
/// </summary>
/// <param name="basicDeliverEventArgs"></param>
/// <param name="queueName"></param>
/// <param name="consumeIndex"></param>
private static void StartListener(EventHandler<BasicDeliverEventArgs> basicDeliverEventArgs, string queueName, int consumeIndex)
{
_basicConsumer[consumeIndex].Received += basicDeliverEventArgs;
_modelReceive[consumeIndex].BasicConsume(queue: queueName, autoAck: false, consumer: _basicConsumer[consumeIndex]); // 设置手动确认。
}
/// <summary>
/// 消息发布
/// </summary>
/// <param name="message"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
public static void PublishExchange(string message, string exchangeName, string routingKey = "")
{
byte[] body = Encoding.UTF8.GetBytes(message);
_modelSend.BasicPublish(exchangeName, routingKey, null, body);
}
}

View Code

 

现在,我把整个

Wsk.Core.RabbitMQ

项目进行添加到依赖注入:

 

 

然后,在启动项目里面的初始化服务里面,添加对

MQ

连接的初始化以及连接,并且发送两条消息进行测试:

 

 

 

启用程序,提示发送成功:

 

 

打开

RabbitMQ

页面客户端,可以看见新增了一个交换机

WeskyExchange

 

 

点进去可以看见对应的流量走势:

 

 

 

 

关闭程序,现在添加消费者的初始化和连接,然后重新发送:

 

 

可见发送消息成功,并且消费者也成功接收到了消息。打开客户端查看一下:

 

 

WeskyExchange

交换机下,多了两个队列,以及队列归属的

RoutingKey

分别是

WeskyNet001

WeskyNet002

。以及在

Queue

目录下,多了两个队列的监控信息:

 

为了看出点效果,我们批量发消息试一下:

 

 

然后启动项目,我们看一下监控效果。先是交换机页面的监控:

 

 

然后是队列

1

的监控:

 

 

现在换一种写法,在消费者那边加个延迟:

 

 

并且生产者的延迟解除:

 

 

再启动一下看看效果:

 

 

会发现队列消息被堵塞,必须在执行完成以后,才可以解锁。而且生产者这边并不需要等待,可以看见消息一次性全发出去了,可以继续执行后续操作:

 

 

以上就是关于使用

Direct

模式进行

RabbitMQ

收发消息的内容,发送消息可以在其他类里面或者方法里面,直接通过静态方法进行发送;接收消息,启动了监听,就可以一直存活。如果有兴趣,也可以自己尝试

Fanout

Topic

等不同的模式进行测试,以及可以根据不同的机器,进行配置成收发到不同服务器上面进行通信。

 

原文: https://www.cnblogs.com/weskynet/p/14878337.html

暂无评论