English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Redis Stream è una nuova struttura dati aggiunta nella versione 5.0 di Redis.
Redis Stream è principalmente utilizzato per le code di messaggi (MQ, Message Queue), Redis ha già una funzione di pubblicazione/iscrizione (pub/sub) per implementare la funzione di coda di messaggi, ma ha un difetto: i messaggi non possono essere persistenti. Se si verifica un'interruzione di rete, un malfunzionamento di Redis, ecc., i messaggi saranno persi.
In sintesi, il modello di pubblicazione/iscrizione (pub/sub) può distribuire messaggi, ma non può registrare messaggi storici.
Redis Stream offre funzionalità di persistenza dei messaggi e replica principale/secondaria, che permettono a qualsiasi client di accedere ai dati in qualsiasi momento e ricordare la posizione di accesso di ogni client, garantendo anche che i messaggi non vengano persi.
La struttura di Redis Stream è come segue, ha una coda di messaggi che catena tutti i messaggi aggiunti, ogni messaggio ha un ID unico e il contenuto corrispondente:
Ogni Stream ha un nome univoco, che è la chiave di Redis, viene creato automaticamente quando utilizziamo per la prima volta l'istruzione xadd per aggiungere messaggi.
Analisi dell'immagine sopra:
Consumer Group :Gruppo di consumo, creato utilizzando il comando XGROUP CREATE, un gruppo di consumo può avere più consumatori (Consumer).
last_delivered_id :Cursor, ogni gruppo di consumo ha un cursor last_delivered_id, ogni volta che un consumatore legge un messaggio, il cursor last_delivered_id si sposta avanti.
pending_ids :Variabile di stato del consumatore (Consumer), utilizzata per mantenere l'id non confermato del consumatore. pending_ids registra i messaggi già letti dal client ma non ancora confermati (carattere di conferma: Acknowledge).
Comandi relativi alla coda di messaggi:
XADD - Aggiungi messaggi alla fine
XTRIM - Taglia il flusso, limitando la lunghezza
XDEL - Elimina i messaggi
XLEN - Ottieni il numero di elementi contenuti nel flusso, ossia la lunghezza dei messaggi
XRANGE - Ottieni l'elenco dei messaggi, filtrando automaticamente i messaggi eliminati
XREVRANGE - Ottieni l'elenco dei messaggi in ordine inverso, ID dal maggiore al minore
XREAD - Ottieni l'elenco dei messaggi in modo bloccante o non bloccante
Comandi relativi al gruppo di consumatori:
XGROUP CREATE - Crea un gruppo di consumatori
XREADGROUP GROUP - Leggi i messaggi dal gruppo di consumatori
XACK - Marca i messaggi come "elaborati"
XGROUP SETID - Imposta un nuovo ID del messaggio last delivered per il gruppo di consumatori
XGROUP DELCONSUMER - Elimina il consumatore
XGROUP DESTROY - Elimina il gruppo di consumatori
XPENDING - Mostra informazioni relative ai messaggi in attesa di elaborazione
XCLAIM - Trasferisci il possesso dei messaggi
XINFO - Visualizza informazioni relative al flusso e ai gruppi di consumatori
XINFO GROUPS - Stampa le informazioni del gruppo di consumatori
XINFO STREAM - Stampa le informazioni di flusso
Utilizzare XADD per aggiungere messaggi alla coda, se la coda specificata non esiste, viene creata una coda, la sintassi di XADD è:
XADD chiave ID campo valore [campo valore ...]
key :队列名称,如果不存在就创建
ID Messaggio id, utilizziamo * per rappresentare quello generato da redis, può essere personalizzato, ma deve essere garantito incrementale.
field value Record
redis> XADD mystream * name Sara surname O'Connor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (intero) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "O'Connor" 2) 1) "1601372323627-1" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3" redis>
Utilizzare XTRIM per ridurre la lunghezza di un flusso, formato grammaticale:
XTRIM key MAXLEN [~] count
key :队列名称
MAXLEN Lunghezza
count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (intero) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) "1601372434568-0" 2) 1) "field1" 2) "A" 3) "field2" 4) "B" 5) "field3" 6) "C" 7) "field4" 8) "D" 127.0.0.1:6379> redis>
Utilizzare XDEL per eliminare messaggi, formato grammaticale:
XDEL key ID [ID ...]
key:队列名称
ID :消息 ID
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (intero) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) "1" 2) 1) 1538561701744-0 2) 1) "c" 2) "3"
使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:
XLEN key
key:队列名称
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XRANGE key start end [COUNT count]
key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis>
使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XREVRANGE key end start [COUNT count]
key :队列名
end :结束值, + 表示最大值
start :开始值, - 表示最小值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) "1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" redis>
使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key [...]] id [id ...]
count :数量
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID
# 从 Stream 头部读取两条消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) "1532" 3) "event-id" 4) "5" 5) "user-id" 6) "7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) "812" 3) "event-id" 4) "9" 5) "user-id" 6) "388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
使用 XGROUP CREATE 创建消费者组,语法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key :队列名称,如果不存在就创建
groupname :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group :消费组名
consumer :消费者名。
count : 读取数量。
milliseconds : 阻塞毫秒数。
key : 队列名。
ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >