欢迎光临
个人技术文档整理

.Net Core 搭建RabbitMQ消息队列生产者和消费者的简单方法

搭建RabbitMQ在.Net Core的简单用法

  • 如果还没有MQ环境,可以参考上一篇的博客,在windows系统上的rabbitmq环境搭建
    windows10环境下的RabbitMQ安装步骤 
  • 如果使用docker环境,可以直接百度一下。
     

打开Nuget安装 RabbitMQ.Client

生产者

        /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="messageBody">消息</param>
        /// <returns></returns>
        [HttpGet]
        public IActionResult AddMQExchange(string messageBody)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1"; //主机名
            factory.UserName = "guest";//使用的用户
            factory.Password = "guest";//用户密码
            factory.Port = 5672;//端口号
            factory.VirtualHost = "/"; //虚拟主机
            factory.MaxMessageSize = 1024; //消息最大字节数 

            ////连接MQ服务器 
            using (var connection = factory.CreateConnection())
            {

                var queueName = "tkw_Queue_001";
                var exchangeName = "tkw_Exchange_001";

                //创建信道,通信管道
                using var channel = connection.CreateModel();

                //创建一个 消息队列
                channel.QueueDeclare(queueName, true, false, false);

                //创建一个交换机
                channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false);

                //绑定 消息队列 到 交换机
                channel.QueueBind(queueName, exchangeName, string.Empty, null);

                //准备就绪,开始写入... 
                //发布数据 到 交换机->自动转发到->队列 
                var byteBody = Encoding.UTF8.GetBytes(messageBody);
                channel.BasicPublish(exchangeName, string.Empty, null, byteBody);
                 
            }

            return new JsonResult(new { Code = 200, Msg = $"写入成功,messageBody={messageBody}" });

        }

消费者

        /// <summary>
        /// 消费者
        /// </summary>
        private static void DoConsumerTask()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1"; //主机名
            factory.UserName = "guest";//使用的用户
            factory.Password = "guest";//用户密码
            factory.Port = 5672;//端口号
            factory.VirtualHost = "/"; //虚拟主机
            factory.MaxMessageSize = 1024; //消息最大字节数 
            using (var connection = factory.CreateConnection())//连接服务器,即正在创建终结点。
            {
                var queueName = "tkw_Queue_001";
                //var exchangeName = "tkw_Exchange_001";

                //创建信道,通信管道
                using var channel = connection.CreateModel();

                try
                {
                    //没绑定交换机,但由于rabbitmq有默认交换机
                    var consumer = new EventingBasicConsumer(channel);

                    //接收到消息事件
                    consumer.Received += (model, ea) =>
                    {
                        var byteBody = ea.Body;
                        var strBody = Encoding.UTF8.GetString(byteBody.ToArray());

                        Console.WriteLine($"[{Thread.CurrentThread.ManagedThreadId}]消费者,接受消息:{strBody}");

                            //此处手动签收模式
                            channel.BasicAck(ea.DeliveryTag, true);

                    };

                    //启动消费者  
                    channel.BasicConsume(
                        queueName,
                          autoAck: false//签收模式
                         , consumer);

                    Console.WriteLine("消费者已启动");
                    Console.ReadLine();
               

                }
                catch (Exception ex)
                {
                    Console.WriteLine($"异常:" + ex.ToString());
                }
            }
             
        }

 

赞(2)