更新時間:2022-08-17 來源:黑馬程序員 瀏覽量:
消息隊列(Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括3個角色:
消息隊列:存儲消息。
生產(chǎn)者:發(fā)送消息到消息隊列,在秒殺任務中負責判斷秒殺時間和庫存,校驗消費者權(quán)限是否是一人一單,發(fā)送優(yōu)惠券id和用戶id到消息隊列中。
消費者:從消息隊列獲取消息并處理消息,接受到訂單消息之后,完成下單。
Redis提供了三種不同的方式來實現(xiàn)消息隊列:
list結(jié)構(gòu):基于List結(jié)構(gòu)模擬消息隊列。
PubSub:基本的點對點消息模型。
Stream:比較完善的消息隊列模型。
消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,很容易模擬出隊列效果。
隊列是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP來實現(xiàn)。
不過要注意的是,當隊列中沒有消息時RPOP或LPOP操作會返回null,并不像JVM的阻塞隊列那樣會阻塞并等待消息。因此這里應該使用BRPOP或者BLOPO來實現(xiàn)阻塞效果。
List的消息隊列可利用Redis存儲,不受限于JVM內(nèi)存上限,是基于基于Redis的持久化機制,數(shù)據(jù)安全性有保證,同時也可以滿足消息隊列的有序性。但只支持但消費者,無法避免消息丟失是它最大的問題。
PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產(chǎn)者向?qū)猚hannel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。對應channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。
SUBSCRIBE channel [channel] :訂閱一個或多個頻道。
PUBLISH channel msg :向一個頻道發(fā)送消息。
PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道。
基于PubSub的消息隊列采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費,但也有不支持數(shù)據(jù)持久化、無法避免消息丟失,消息堆積有上限的缺點,超出時數(shù)據(jù)會丟失。
例如:
Stream讀取消息的方式之一:XREAD
例如:
在業(yè)務開發(fā)中,我們可以循環(huán)的調(diào)用XREAD阻塞方式來查詢最新消息,從而實現(xiàn)持續(xù)監(jiān)聽隊列的效果,偽代碼如下:
注意:當我們指定起始ID為$時,代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過1條以上的消息到達隊列,則下次獲取時也只能獲取到最新的一條,會出現(xiàn)漏讀消息的問題。
STREAM類型消息隊列的XREAD命令,消息可回溯,一個消息可以被多個消費者讀取,同時可能阻塞讀取,有消息漏讀的風險。
基于Stream的消息隊列-消費者組
消費者組(Consumer Group):將多個消費者劃分到一個組中,監(jiān)聽同一個隊列。具備下列特點:
1.消息分流:隊列中的消息會分流給組內(nèi)的不同消費者,而不是重復消費,從而加快消息處理的速度。
2.消息標示:消費者組會維護一個標示,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標示之后讀取消息。確保每一個消息都會被消費。
3.消息確認:消費者獲取消息后,消息處于pending狀態(tài),并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。
創(chuàng)建消費者組:
XGROUP CREATE key groupName ID [MKSTREAM]
key:隊列名稱
groupName:消費者組名稱
ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息
MKSTREAM:隊列不存在時自動創(chuàng)建隊列。
其它常見命令:
# 刪除指定的消費者組 XGROUP DESTORY key groupName # 給指定的消費者組添加消費者 XGROUP CREATECONSUMER key groupname consumername # 刪除消費者組中的指定消費者 XGROUP DELCONSUMER key groupname consumername
從消費者組讀取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消費組名稱
consumer:消費者名稱,如果消費者不存在,會自動創(chuàng)建一個消費者
count:本次查詢的最大數(shù)量
BLOCK milliseconds:當沒有消息時最長等待時間
NOACK:無需手動ACK,獲取到消息后自動確認
STREAMS key:指定隊列名稱
ID:獲取消息的起始ID:">":從下一個未消費的消息開始
其它:根據(jù)指定id從pending-list中獲取已消費但未確認的消息,例如0,是從pending-list中的第一個消息開始。
消費者監(jiān)聽消息的基本思路:
STREAM類型消息隊列的XREADGROUP命令特點:消息可回溯,一個消息可以只能被一個消費者讀取??梢宰枞x取.沒有消息漏讀的風險,有消息確認機制,保證消息至少被消費一次。