English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Redis Stream

Redis Stream è una nuova struttura dati aggiunta nella versione 5.0 di Redis.

Redis Stream è principalmente utilizzato per le code di messaggi (MQ, Message Queue), Redis ha già una funzione di pubblicazione/iscrizione (pub/sub) per implementare la funzione di coda di messaggi, ma ha un difetto: i messaggi non possono essere persistenti. Se si verifica un'interruzione di rete, un malfunzionamento di Redis, ecc., i messaggi saranno persi.

In sintesi, il modello di pubblicazione/iscrizione (pub/sub) può distribuire messaggi, ma non può registrare messaggi storici.

Redis Stream offre funzionalità di persistenza dei messaggi e replica principale/secondaria, che permettono a qualsiasi client di accedere ai dati in qualsiasi momento e ricordare la posizione di accesso di ogni client, garantendo anche che i messaggi non vengano persi.

La struttura di Redis Stream è come segue, ha una coda di messaggi che catena tutti i messaggi aggiunti, ogni messaggio ha un ID unico e il contenuto corrispondente:

Ogni Stream ha un nome univoco, che è la chiave di Redis, viene creato automaticamente quando utilizziamo per la prima volta l'istruzione xadd per aggiungere messaggi.

Analisi dell'immagine sopra:

  • Consumer Group :Gruppo di consumo, creato utilizzando il comando XGROUP CREATE, un gruppo di consumo può avere più consumatori (Consumer).

  • last_delivered_id :Cursor, ogni gruppo di consumo ha un cursor last_delivered_id, ogni volta che un consumatore legge un messaggio, il cursor last_delivered_id si sposta avanti.

  • pending_ids :Variabile di stato del consumatore (Consumer), utilizzata per mantenere l'id non confermato del consumatore. pending_ids registra i messaggi già letti dal client ma non ancora confermati (carattere di conferma: Acknowledge).

Comandi relativi alla coda di messaggi:

  • XADD - Aggiungi messaggi alla fine

  • XTRIM - Taglia il flusso, limitando la lunghezza

  • XDEL - Elimina i messaggi

  • XLEN - Ottieni il numero di elementi contenuti nel flusso, ossia la lunghezza dei messaggi

  • XRANGE - Ottieni l'elenco dei messaggi, filtrando automaticamente i messaggi eliminati

  • XREVRANGE - Ottieni l'elenco dei messaggi in ordine inverso, ID dal maggiore al minore

  • XREAD - Ottieni l'elenco dei messaggi in modo bloccante o non bloccante

Comandi relativi al gruppo di consumatori:

  • XGROUP CREATE - Crea un gruppo di consumatori

  • XREADGROUP GROUP - Leggi i messaggi dal gruppo di consumatori

  • XACK - Marca i messaggi come "elaborati"

  • XGROUP SETID - Imposta un nuovo ID del messaggio last delivered per il gruppo di consumatori

  • XGROUP DELCONSUMER - Elimina il consumatore

  • XGROUP DESTROY - Elimina il gruppo di consumatori

  • XPENDING - Mostra informazioni relative ai messaggi in attesa di elaborazione

  • XCLAIM - Trasferisci il possesso dei messaggi

  • XINFO - Visualizza informazioni relative al flusso e ai gruppi di consumatori

  • XINFO GROUPS - Stampa le informazioni del gruppo di consumatori

  • XINFO STREAM - Stampa le informazioni di flusso

XADD

Utilizzare XADD per aggiungere messaggi alla coda, se la coda specificata non esiste, viene creata una coda, la sintassi di XADD è:

XADD chiave ID campo valore [campo valore ...]
  • key :队列名称,如果不存在就创建

  • ID Messaggio id, utilizziamo * per rappresentare quello generato da redis, può essere personalizzato, ma deve essere garantito incrementale.

  • field value Record

redis> XADD mystream * name Sara surname O'Connor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(intero) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "O'Connor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
redis>

XTRIM

Utilizzare XTRIM per ridurre la lunghezza di un flusso, formato grammaticale:

XTRIM key MAXLEN [~] count
  • key :队列名称

  • MAXLEN Lunghezza

  • count :数量

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(intero) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379> 
redis>

XDEL

Utilizzare XDEL per eliminare messaggi, formato grammaticale:

XDEL key ID [ID ...]
  • key:队列名称

  • ID :消息 ID

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(intero) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

XLEN

使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:

XLEN key
  • key:队列名称

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key :队列名

  • start :开始值, - 表示最小值

  • end :结束值, + 表示最大值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key :队列名

  • end :结束值, + 表示最大值

  • start :开始值, - 表示最小值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key [...]] id [id ...]
  • count :数量

  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式

  • key :队列名

  • id :消息 ID

# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建

  • groupname :组名。

  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名

  • consumer :消费者名。

  • count : 读取数量。

  • milliseconds : 阻塞毫秒数。

  • key : 队列名。

  • ID : 消息 ID。

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >