【.NET 6】RabbitMQ延遲消費指南( 三 )

DMConsumer完整實現如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct, durable: true, autoDelete: false);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dmExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dm", configure => configure.AsDelayedExchange(ExchangeType.Direct));//兩個交換機綁定await bus.Advanced.BindAsync(dmExchange, sourceExchange, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 開始消費 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers["biz-delayed"] = 1;message.WithDelay(TimeSpan.FromHours(1));await bus.Advanced.PublishAsync(dmExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延遲消費");}else{//假裝在消費//Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消費");}}相比于DLXConsumerDMConsumer里,我們不需要額外的隊列,只需要創建orders.notification_dm交換機即可,同時直接將交換機綁定到orders.notification交換機 , EasyNetQ里使用AsDelayedExchange指示該交換機為延遲交換機,使用WithDelay設置消息延遲時間通過查看EasyNetQ源碼 https://github.com/EasyNetQ/EasyNetQ/blob/master/Source/EasyNetQ/DelayedExchangeExtensions.cs , 它封裝延遲交換機的設置

【.NET 6】RabbitMQ延遲消費指南

文章插圖
啟動Producer再生成一些數據,然后運行DLXConsumer看效果,和DLXConsumer一樣
【【.NET 6】RabbitMQ延遲消費指南】
【.NET 6】RabbitMQ延遲消費指南

文章插圖
打開RabbitMQ后臺,可以看到多了一個類型為x-delayed-messageorders.notification_dm交換機,帶有DMArgs兩個標簽
【.NET 6】RabbitMQ延遲消費指南

文章插圖
進入交換機 , 可以看到里面已經存儲了13條消息 。
【.NET 6】RabbitMQ延遲消費指南

文章插圖
總結自此,利用隊列的死信交換機策略和利用rabbitmq_delayed_message_exchange插件實現RabbitMQ消息延遲已經介紹完畢 , 下面是.NET6 demo完整的項目結構
【.NET 6】RabbitMQ延遲消費指南

文章插圖
其實除了這兩種,EasyNetQ也有一個調度器(Scheduler)可以實現延遲消息,但似乎需要依賴數據庫,不是主流的做法不推薦使用 。
如有任何問題或者意見,歡迎評論 。

推薦閱讀