English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
(primo) Introduzione
Perché introdurre la coda di messaggi?
1. Decoupling del programma
2. Migliorare le prestazioni
3. Ridurre la complessità delle logiche aziendali
(due) Operazioni RabbitMQ con Python
La configurazione, l'installazione e l'uso di base di RabbitMQ sono descritti nell'articolo precedente, non verranno ripetuti.
Per utilizzare RabbitMQ con Python, è necessario installare il modulo pika, installare direttamente con pip:
pip install pika
1. La più semplice conversazione tra produttore e consumatore di RabbitMQ:
producer:
#Author :ywq import pika auth=pika.PlainCredentials('ywq','qwe') #salvare informazioni di autenticazione connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #connettiti a rabbit channel = connection.channel() #creare canale channel.queue_declare(queue='hello') #dichiarare coda #n RabbitMQ una messaggio non può essere inviato direttamente alla coda, deve sempre passare attraverso un'exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') #il contenuto del corpo è il messaggio print(" [x] Inviato 'Hello World!'") connection.close()
consumer:
#Author :ywq import pika connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #connettiti a rabbit channel = connection.channel() #crea canale channel.queue_declare(queue='hello') #decalre coda def callback(ch, method, properties, body): print(" [x] Ricevuto %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] In attesa di messaggi. Per uscire premere CTRL+C') channel.start_consuming()
Durante il processo di trasmissione e consumo dei messaggi, è possibile visualizzare in tempo reale le informazioni sulla coda dei messaggi nella pagina di gestione web di rabbit.
2. Coda di messaggi persistente, evitare la perdita della coda di messaggi a causa di guasti o altre circostanze impreviste.
Non è necessario modificare il consumatore, aggiungere due proprietà nel codice del produttore, rispettivamente per rendere i messaggi e la coda persistenti, è necessario attivare entrambi per evitare la perdita di messaggi:
delivery_mode=2 #rendere msg persistente durable=True
La posizione di inserimento delle proprietà è illustrata nel codice seguente (prodotto):
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.queue_declare(queue='test1',durable=True) #durable=True, rendere la coda persistente msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='', routing_key='test1', body=msg, properties=pika.BasicProperties( delivery_mode=2 #rendere msg persistente ) ) print('Send done:', msg) connection.close()
3. Distribuzione equa
Nel caso di più consumer, Rabbit per default invia messaggi in modo round-robin, ma alcuni consumer consumano più velocemente di altri, per un utilizzo delle risorse più equo, viene introdotta la meccanica di conferma ack. Dopo aver consumato i messaggi, il consumer invierà un ack a Rabbit, una volta che il numero di messaggi non confermati supera il numero specificato, non verranno più inviati a questo consumer, ma a altri consumer.
Il codice del lato producer non deve essere modificato, è necessario inserire due attributi nel codice del lato consumer:
channel.basic_qos(prefetch_count= *) # definisci il numero massimo di non_ack_count channel.basic_ack(delivery_tag=deliver.delivery_tag) # invia ack a RabbitMQ
La posizione di inserimento dell'attributo è come segue (lato consumer):
#Author :ywq import pika,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.queue_declare(queue='test2',durable=True) def callback(chann,deliver,properties,body): print('Recv:',body) time.sleep(5) chann.basic_ack(delivery_tag=deliver.delivery_tag) # invia ack a rabbit chann.basic_qos(prefetch_count=1) ''' Attenzione, no_ack=False Attenzione, qui il tipo no_ack è solo per informare Rabbit se questo consumer deve restituire un ack, se si desidera restituire un ack, è necessario definire nel callback prefetch_count=1, se il numero di messaggi non confermati supera 1, questo consumer non accetta più messaggi, questa configurazione deve essere scritta sopra channel.basic_consume, altrimenti può verificarsi una situazione di non_ack. ''' channel.basic_consume( callback, queue='test2' ) channel.start_consuming()
Terzo: Pubblicazione/Sottoscrizione dei messaggi
I modelli superiori sono tutti inviati una volta dal lato producer e ricevuti una volta dal lato consumer, ma è possibile implementare un producer che invia e più consumer associati ricevono contemporaneamente? Certo, Rabbit supporta la pubblicazione e la sottoscrizione dei messaggi, supporta tre modelli, tramite il componente del trasmettitore exchange, realizza tre modelli:
fanout: Tutte le ceste bindate a questo exchange possono ricevere messaggi, simile alla trasmissione in broadcast.
direct: Attraverso il routingKey e l'exchange viene determinata quale cesta unica può ricevere messaggi, inviati ai consumer che hanno bindato questa cesta, simile al multicast.
topic: Tutte le ceste di routingKey (che può essere un'espressione in questo momento) collegate alle ceste bind possono ricevere messaggi, simile alla corrispondenza del percorso di prefisso.
1.fanout
publish end (producer):
#Author :ywq import pika,sys,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='hello', exchange_type='fanout' ) msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time() channel.basic_publish( exchange='hello', routing_key='', body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) print('send done') connection.close()
subscribe end (consumer):
#Author :ywq import pika auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare( exchange='hello', exchange_type='fanout' ) random_num=channel.queue_declare(exclusive=True) # crea una coda casuale con rabbit, la coda viene eliminata e liberata immediatamente dopo la disconnessione del consumer queue_name=random_num.method.queue chann.basic_qos(prefetch_count=1) channel.queue_bind( queue=queue_name, exchange='hello' ) def callback(chann,deliver,properties,body): print('Recv:',body) chann.basic_ack(delivery_tag=deliver.delivery_tag) # invia ack a rabbit channel.basic_consume( callback, queue=queue_name, ) channel.start_consuming()
Realizzare l'invio di un producer una volta e la ricezione di più consumer associati.
Quando si utilizza il modello exchange:
1.Sul lato producer non è necessario dichiarare la coda, ma direttamente l'exchange
2.Sul lato consumer è necessario associare la coda e specificare l'exchange per ricevere i messaggi
3.E' meglio che il consumer crei una coda casuale e la liberi immediatamente dopo l'uso.
Il nome della coda casuale può essere rilevato sotto web:
2.direct
Utilizzando l'exchange, il consumer può ricevere messaggi in modo selettivo. La coda è associata a una chiave, il producer invia i dati all'exchange in base alla chiave, l'exchange determina la coda di destinazione in base alla chiave e il consumer riceve corrispondentemente. Questo significa che è stato aggiunto un routing key sul modello fanout.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='direct_log', exchange_type='direct', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='direct_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='direct_log', exchange_type='direct' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='direct_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[level:%s],[msg:%s]' %(route_key,body)) ch.basic_ack(delivery_tag=deliver.delivery_tag) chann.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
Aprire contemporaneamente più consumer, di cui due ricevono notice, due ricevono warning, l'effetto di esecuzione è il seguente:
3.topic
Confrontato con direct, topic può implementare un modo di lavoro di corrispondenza vaga (specificato dal lato consumer), se il routing key contiene la parola chiave specificata, allora il msg verrà inviato alla queue associata.
Regole di corrispondenza del simbolo wildcard di rabbitmq:
Il simbolo ‘#’ corrisponde a una o più parole, il simbolo ‘’ corrisponde a una parola. Pertanto ‘abc.#’ può corrispondere a ‘abc.m.n’, ma ‘abc.*’ corrisponderà solo a ‘abc.m’. Il ‘.’ è il simbolo di separazione. Durante l'uso dei segni di asterisco deve essere utilizzato il ‘.’ come simbolo di separazione.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='topic_log', exchange_type='topic', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='topic_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='topic_log', exchange_type='topic' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='topic_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[type:%s],[msg:%s]' %(route_key,body)) ch.basic_ack(delivery_tag=deliver.delivery_tag) chann.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
Effetto di esecuzione:
La breve presentazione dei tre modelli publish/subscribe di rabbitmq è stata completata.
Questa guida completa su python queue communication: l'uso di rabbitMQ (esempio illustrato) è tutto ciò che l'autore ha condiviso con voi. Spero che possa essere utile come riferimento e spero che possiate sostenere il tutorial.
Dichiarazione: il contenuto di questo articolo è stato prelevato da Internet, di proprietà del rispettivo autore. Il contenuto è stato fornito volontariamente dagli utenti di Internet e caricato autonomamente. Questo sito non detiene i diritti di proprietà e non ha effettuato alcuna modifica editoriale, né assume alcuna responsabilità legale. Se trovi contenuti che violano i diritti d'autore, ti preghiamo di inviare una email a notice#oldtoolbag.com (sostituisci # con @) per segnalare il problema, fornendo prove pertinenti. Una volta verificata la veridicità, il sito eliminerà immediatamente i contenuti contestati.