RabbitMQ如何實現延遲佇列

2022-06-23 21:34:35 字數 3084 閱讀 6666

rabbitmq如何實現延遲佇列:延遲佇列儲存的物件肯定是對應的延遲訊息,所謂”延遲訊息”是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。

場景一:在訂單系統中,一個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那麼這個訂單將進行一場處理。這是就可以使用延遲佇列將訂單資訊傳送到延遲佇列。

場景二:使用者希望通過手機遠端遙控家裡的智慧裝置在指定的時間進行工作。這時候就可以將使用者指令傳送到延遲佇列,當指令設定的時間到了再將指令推送到只能裝置。

rabbitmq怎麼實現延遲佇列

amqp協議,以及rabbitmq本身沒有直接支援延遲佇列的功能,但是可以通過ttl和dlx模擬出延遲佇列的功能。

ttl(time to live)

rabbitmq可以針對queue和message設定 x-message-tt,來控制訊息的生存時間,如果超時,則訊息變為dead letter

rabbitmq針對佇列中的訊息過期時間有兩種方法可以設定。

a: 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。 b: 對訊息進行單獨設定,每條訊息ttl可以不同。如果同時使用,則訊息的過期時間以兩者之間ttl較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的ttl值,就成為dead letter

詳細可以參考:rabbitmq之ttl(time-to-live 過期時間)

dlx (dead-letter-exchange)

rabbitmq的queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由。

x-dead-letter-exchange:出現dead letter之後將dead letter重新傳送到指定exchange x-dead-letter-routing-key:指定routing-key傳送佇列出現dead letter的情況有:

訊息或者佇列的ttl過期 佇列達到最大長度 訊息被消費端拒絕(basic.reject or basic.nack)並且requeue=false

利用dlx,當訊息在一個佇列中變成死信後,它能被重新publish到另一個exchange。這時候訊息就可以重新被消費。詳細可以參考:rabbitmq之死信佇列

**示例

首先建立2個exchange和2個queue:

exchange_delay_begin:這個是producer端傳送時呼叫的exchange, 將訊息傳送至queue_dealy_begin中。 queue_delay_begin: 通過routingkey=”delay”繫結exchang_delay_begin, 同時配置dlx=exchange_delay_done, 當訊息變成死信時,發往exchange_delay_done中。 exchange_delay_done: 死信的exchange, 如果不配置x-dead-letter-routing-key則採用原有預設的routingkey,即queue_delay_begin繫結exchang_delay_beghin採用的“delay”。 queue_delay_done:訊息在ttl到期之後,最終通過exchang_delay_done傳送值此queue,消費端通過消費此queue的訊息,即可以達到延遲的效果。1. 建立exchange和queue的**(當然這裡可以通過rabbitmq的管理介面來實現,無需code相關**):12

3456

78910

channel.exchangedeclare("exchange_delay_begin", "direct", true);

channel.exchangedeclare("exchange_delay_done", "direct", true);

mapargs = new hashmap();

args.put("x-dead-letter-exchange", "exchange_delay_done");

channel.queuedeclare("queue_delay_begin", true, false, false, args);

channel.queuedeclare("queue_delay_done", true, false, false, null);

channel.queuebind("queue_delay_begin", "exchange_delay_begin", "delay");

channel.queuebind("queue_delay_done", "exchange_delay_done", "delay");

2. consumer端**:12

3456

789queueingconsumer consumer = new queueingconsumer(channel);

channel.basicconsume("queue_delay_done", false, consumer);

while (true)

3. producer端**:設定訊息的延遲時間為1min。12

3456

7amqp.basicproperties.builder builder = new amqp.basicproperties.builder();

builder.expiration("60000");//設定訊息ttl

builder.deliverymode(2);//設定訊息持久化

amqp.basicproperties properties = builder.build();

string message = string.valueof(new date());

channel.basicpublish("exchange_delay_begin","delay",properties,message.getbytes());

在建立完exchange和queue之後,首先執行consumer端的**,之後執行producer端的**,待producer傳送完畢之後,檢視consumer端的輸出:

1receive msg time:tue feb 14 21:06:19 cst 2017, msg body:tue feb 14 21:05:19 cst 2017

可以看到延遲1min消費了相關訊息。