English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

python队列通信:rabbitMQ的使用(实例讲解)

(primo) Introduzione

Perché introdurre la coda di messaggi?

1. Decoupling del programma

2. Migliorare le prestazioni

3. Ridurre la complessità delle logiche aziendali

(due) Operazioni RabbitMQ con Python

La configurazione, l'installazione e l'uso di base di RabbitMQ sono descritti nell'articolo precedente, non verranno ripetuti.

Per utilizzare RabbitMQ con Python, è necessario installare il modulo pika, installare direttamente con pip:

pip install pika

1. La più semplice conversazione tra produttore e consumatore di RabbitMQ:

producer:

#Author :ywq
import pika
auth=pika.PlainCredentials('ywq','qwe') #salvare informazioni di autenticazione
connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #connettiti a rabbit
channel = connection.channel() #creare canale
channel.queue_declare(queue='hello') #dichiarare coda
#n RabbitMQ una messaggio non può essere inviato direttamente alla coda, deve sempre passare attraverso un'exchange.
channel.basic_publish(exchange='',
   routing_key='hello',
   body='Hello World!') #il contenuto del corpo è il messaggio
print(" [x] Inviato 'Hello World!'")
connection.close()

consumer:

#Author :ywq
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #connettiti a rabbit
channel = connection.channel()  #crea canale
channel.queue_declare(queue='hello') #decalre coda
def callback(ch, method, properties, body):
 print(" [x] Ricevuto %r" % body)
channel.basic_consume(callback,
   queue='hello',
   no_ack=True)
print(' [*] In attesa di messaggi. Per uscire premere CTRL+C')
channel.start_consuming()

Durante il processo di trasmissione e consumo dei messaggi, è possibile visualizzare in tempo reale le informazioni sulla coda dei messaggi nella pagina di gestione web di rabbit.

2. Coda di messaggi persistente, evitare la perdita della coda di messaggi a causa di guasti o altre circostanze impreviste.

Non è necessario modificare il consumatore, aggiungere due proprietà nel codice del produttore, rispettivamente per rendere i messaggi e la coda persistenti, è necessario attivare entrambi per evitare la perdita di messaggi:

delivery_mode=2 #rendere msg persistente
durable=True

La posizione di inserimento delle proprietà è illustrata nel codice seguente (prodotto):

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth_info
 ))
channel=connection.channel()
channel.queue_declare(queue='test1',durable=True) #durable=True, rendere la coda persistente
msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
 exchange='',
 routing_key='test1',
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2 #rendere msg persistente
 )
)
print('Send done:', msg)
connection.close()

3. Distribuzione equa

Nel caso di più consumer, Rabbit per default invia messaggi in modo round-robin, ma alcuni consumer consumano più velocemente di altri, per un utilizzo delle risorse più equo, viene introdotta la meccanica di conferma ack. Dopo aver consumato i messaggi, il consumer invierà un ack a Rabbit, una volta che il numero di messaggi non confermati supera il numero specificato, non verranno più inviati a questo consumer, ma a altri consumer.

Il codice del lato producer non deve essere modificato, è necessario inserire due attributi nel codice del lato consumer:

channel.basic_qos(prefetch_count= *) # definisci il numero massimo di non_ack_count
channel.basic_ack(delivery_tag=deliver.delivery_tag) # invia ack a RabbitMQ

La posizione di inserimento dell'attributo è come segue (lato consumer):

#Author :ywq
import pika,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.queue_declare(queue='test2',durable=True)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 time.sleep(5)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) # invia ack a rabbit
chann.basic_qos(prefetch_count=1)
'''
Attenzione, no_ack=False Attenzione, qui il tipo no_ack è solo per informare Rabbit se questo consumer deve restituire un ack, se si desidera restituire un ack, è necessario definire nel callback
prefetch_count=1, se il numero di messaggi non confermati supera 1, questo consumer non accetta più messaggi, questa configurazione deve essere scritta sopra channel.basic_consume, altrimenti può verificarsi una situazione di non_ack.
'''
channel.basic_consume(
 callback,
 queue='test2'
)
channel.start_consuming()

Terzo: Pubblicazione/Sottoscrizione dei messaggi

I modelli superiori sono tutti inviati una volta dal lato producer e ricevuti una volta dal lato consumer, ma è possibile implementare un producer che invia e più consumer associati ricevono contemporaneamente? Certo, Rabbit supporta la pubblicazione e la sottoscrizione dei messaggi, supporta tre modelli, tramite il componente del trasmettitore exchange, realizza tre modelli:

fanout: Tutte le ceste bindate a questo exchange possono ricevere messaggi, simile alla trasmissione in broadcast.

direct: Attraverso il routingKey e l'exchange viene determinata quale cesta unica può ricevere messaggi, inviati ai consumer che hanno bindato questa cesta, simile al multicast.

topic: Tutte le ceste di routingKey (che può essere un'espressione in questo momento) collegate alle ceste bind possono ricevere messaggi, simile alla corrispondenza del percorso di prefisso.

1.fanout

publish end (producer):

#Author :ywq
import pika,sys,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='hello',
    exchange_type='fanout'
    )
msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time()
channel.basic_publish(
 exchange='hello',
 routing_key='',
 body=msg,
 properties=pika.BasicProperties(
 delivery_mode=2
 )
)
print('send done')
connection.close()

subscribe end (consumer):

#Author :ywq
import pika
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(
 exchange='hello',
 exchange_type='fanout'
)
random_num=channel.queue_declare(exclusive=True) # crea una coda casuale con rabbit, la coda viene eliminata e liberata immediatamente dopo la disconnessione del consumer
queue_name=random_num.method.queue
chann.basic_qos(prefetch_count=1)
channel.queue_bind(
 queue=queue_name,
 exchange='hello'
)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) # invia ack a rabbit
channel.basic_consume(
 callback,
 queue=queue_name,
)
channel.start_consuming()

Realizzare l'invio di un producer una volta e la ricezione di più consumer associati.

Quando si utilizza il modello exchange:

1.Sul lato producer non è necessario dichiarare la coda, ma direttamente l'exchange

2.Sul lato consumer è necessario associare la coda e specificare l'exchange per ricevere i messaggi

3.E' meglio che il consumer crei una coda casuale e la liberi immediatamente dopo l'uso.

Il nome della coda casuale può essere rilevato sotto web:

2.direct

Utilizzando l'exchange, il consumer può ricevere messaggi in modo selettivo. La coda è associata a una chiave, il producer invia i dati all'exchange in base alla chiave, l'exchange determina la coda di destinazione in base alla chiave e il consumer riceve corrispondentemente. Questo significa che è stato aggiunto un routing key sul modello fanout.

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='direct_log',
   exchange_type='direct',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='direct_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='direct_log',
 exchange_type='direct'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
 queue=queue_name,
 exchange='direct_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[level:%s],[msg:%s]' %(route_key,body))
 ch.basic_ack(delivery_tag=deliver.delivery_tag)
chann.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

Aprire contemporaneamente più consumer, di cui due ricevono notice, due ricevono warning, l'effetto di esecuzione è il seguente:

3.topic

Confrontato con direct, topic può implementare un modo di lavoro di corrispondenza vaga (specificato dal lato consumer), se il routing key contiene la parola chiave specificata, allora il msg verrà inviato alla queue associata.

Regole di corrispondenza del simbolo wildcard di rabbitmq:

Il simbolo ‘#’ corrisponde a una o più parole, il simbolo ‘’ corrisponde a una parola. Pertanto ‘abc.#’ può corrispondere a ‘abc.m.n’, ma ‘abc.*’ corrisponderà solo a ‘abc.m’. Il ‘.’ è il simbolo di separazione. Durante l'uso dei segni di asterisco deve essere utilizzato il ‘.’ come simbolo di separazione.

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='topic_log',
   exchange_type='topic',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='topic_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='topic_log',
 exchange_type='topic'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
 queue=queue_name,
 exchange='topic_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[type:%s],[msg:%s]' %(route_key,body))
 ch.basic_ack(delivery_tag=deliver.delivery_tag)
chann.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

Effetto di esecuzione:

La breve presentazione dei tre modelli publish/subscribe di rabbitmq è stata completata.

Questa guida completa su python queue communication: l'uso di rabbitMQ (esempio illustrato) è tutto ciò che l'autore ha condiviso con voi. Spero che possa essere utile come riferimento e spero che possiate sostenere il tutorial.

Dichiarazione: il contenuto di questo articolo è stato prelevato da Internet, di proprietà del rispettivo autore. Il contenuto è stato fornito volontariamente dagli utenti di Internet e caricato autonomamente. Questo sito non detiene i diritti di proprietà e non ha effettuato alcuna modifica editoriale, né assume alcuna responsabilità legale. Se trovi contenuti che violano i diritti d'autore, ti preghiamo di inviare una email a notice#oldtoolbag.com (sostituisci # con @) per segnalare il problema, fornendo prove pertinenti. Una volta verificata la veridicità, il sito eliminerà immediatamente i contenuti contestati.