搭建RabbitMQ在.Net Core的简单用法
- 如果还没有MQ环境,可以参考上一篇的博客,在windows系统上的rabbitmq环境搭建
windows10环境下的RabbitMQ安装步骤 - 如果使用docker环境,可以直接百度一下。
打开Nuget安装 RabbitMQ.Client
生产者
/// <summary>
/// 生产者
/// </summary>
/// <param name="messageBody">消息</param>
/// <returns></returns>
[HttpGet]
public IActionResult AddMQExchange(string messageBody)
{
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
////连接MQ服务器
using (var connection = factory.CreateConnection())
{
var queueName = "tkw_Queue_001";
var exchangeName = "tkw_Exchange_001";
//创建信道,通信管道
using var channel = connection.CreateModel();
//创建一个 消息队列
channel.QueueDeclare(queueName, true, false, false);
//创建一个交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false);
//绑定 消息队列 到 交换机
channel.QueueBind(queueName, exchangeName, string.Empty, null);
//准备就绪,开始写入...
//发布数据 到 交换机->自动转发到->队列
var byteBody = Encoding.UTF8.GetBytes(messageBody);
channel.BasicPublish(exchangeName, string.Empty, null, byteBody);
}
return new JsonResult(new { Code = 200, Msg = $"写入成功,messageBody={messageBody}" });
}
消费者
/// <summary>
/// 消费者
/// </summary>
private static void DoConsumerTask()
{
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
using (var connection = factory.CreateConnection())//连接服务器,即正在创建终结点。
{
var queueName = "tkw_Queue_001";
//var exchangeName = "tkw_Exchange_001";
//创建信道,通信管道
using var channel = connection.CreateModel();
try
{
//没绑定交换机,但由于rabbitmq有默认交换机
var consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (model, ea) =>
{
var byteBody = ea.Body;
var strBody = Encoding.UTF8.GetString(byteBody.ToArray());
Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}]消费者,接受消息:{strBody}");
//此处手动签收模式
channel.BasicAck(ea.DeliveryTag, true);
};
//启动消费者
channel.BasicConsume(
queueName,
autoAck: false//签收模式
, consumer);
Console.WriteLine("消费者已启动");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine($"异常:" + ex.ToString());
}
}
}