.netcore3.1 RabbitMq Single Active Consumer 电脑版发表于:2021/2/2 15:38  >#.netcore3.1 RabbitMq Single Active Consumer [TOC] Single Active Consumer ------------ tn>单个活动的消费者每次只能从队列中消费一个消费者,并在活动的消费者被取消或死亡的情况下故障转移到另一个注册的消费者。当必须按照消息到达队列的顺序来使用和处理消息时,仅与一个使用者一起使用很有用。举个例子:  >### 创建队列 tn>我们使用`x-single-active-consumer`参数来定义一个`Single_Active_Consumer_Queue`的队列。 ```csharp static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 声明一个Single_Active_Consumer_Queue队列 channel.QueueDeclare( queue: "Single_Active_Consumer_Queue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string,object>{ { "x-single-active-consumer",true }, }); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } ``` tn>运行并创建对应的队列,在UI中查看。  >### 创建消费端代码 tn>我们这里通过从`args`接收一个参数作为消费端的名称。 ```csharp var factory = new ConnectionFactory() { HostName = "47.98.187.188", UserName = "bob", Password = "bob" }; string Name = args[0]; // 创建一个链接 using (var connection = factory.CreateConnection()) { // 创建一个通道 using (var channel = connection.CreateModel()) { // 我们这里只需要获取去B餐厅的那个人就可以了 var consumer = new EventingBasicConsumer(channel); // 绑定处理事件 channel.BasicConsume(queue: "Single_Active_Consumer_Queue", autoAck: false, consumer: consumer); // B餐厅消费 consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var props = ea.BasicProperties; try { var message = Encoding.UTF8.GetString(body); Console.WriteLine("{0} Message: {1}", Name, message); } catch (Exception e) { Console.WriteLine("Have Error:" + e.Message); } finally { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } ``` tn>并在运行目录中打开两个客户端`A_Client`与`B_Client`,注意程序有启动先打开`A_Client`再打开`B_Client`. >### 添加消息 tn>我们将`10`条消息放入至`Single_Active_Consumer_Queue`队列中 ```csharp for (int i = 0; i < 10; i++) { var msg = Encoding.UTF8.GetBytes($"Message {i} ,Priority: 1"); channel.BasicPublish(string.Empty, "Single_Active_Consumer_Queue", null, msg); } ```  tn>我们发现这并不符合我们的程序设计,这样`A`都把活路干完了,我们设置让`A`只干`5`条数据 >### 调整消费端处理量 ```csharp // 我们这里只需要获取去B餐厅的那个人就可以了 var consumer = new EventingBasicConsumer(channel); // 绑定处理事件 channel.BasicConsume(queue: "Single_Active_Consumer_Queue", autoAck: false, consumer: consumer); // B餐厅消费 consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var props = ea.BasicProperties; try { var message = Encoding.UTF8.GetString(body); Console.WriteLine("{0} Message: {1}", Name, message); // 处理4条就退出 if (Name.Contains("A") && ea.DeliveryTag == 4) { Console.WriteLine("A go home!!!!"); // 退出程序的代码 Environment.Exit(0); } } catch (Exception e) { Console.WriteLine("Have Error:" + e.Message); } finally { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); ```   tn>由于在`A_Client`中消息`3`没有被交付后面又再一次处理了,但是我们做了更多的测试发现也会存在下面的结果。   tn>这是由于事件异步,`A_Client`关得太快而导致该问题的发生,如何解决呢?我们可以尝试一下添加`BasicQos`,我在处理这条消息时,在没处理完之前不要给我推消息过来处理。 ```csharp channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); ``` 