English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Python concorrente 2: utilizzo di asyncio per la gestione delle operazioni concorrenti

asyncio

Nel periodo di Python 2, il programmazione di rete ad alta prestazione era principalmente utilizzata con le librerie Twisted, Tornado e Gevent, ma i loro codici asincroni non erano compatibili tra loro e non potevano essere trasportati. Come descritto nella sezione precedente, Gvanrossum voleva implementare una libreria di coroutine nativa basata su generatori in Python 3, che avesse già integrato il supporto per l'IO asincrono, ovvero asyncio, che è stato introdotto nella libreria standard in Python 3.4.

Il pacchetto asyncio utilizza coroutine guidate dal ciclo di eventi per implementare la concorrenza.

Prima di essere inclusa nella libreria standard, il pacchetto asyncio aveva il codice nome “Tulip”(Tulipano),quindi durante la ricerca di informazioni su Internet, è spesso possibile vedere questo nome fiore.

Cos'è il ciclo di eventi?

Sul wiki si dice che il ciclo di eventi è “una struttura di programmazione che aspetta che il programma assegni eventi o messaggi”. In sintesi, il ciclo di eventi è: “quando A accade, esegui B”. Oppure, per spiegare questo concetto con un esempio semplice, è come il ciclo di eventi JavaScript presente in ogni browser. Quando fai clic su qualcosa (“quando A accade”), questo clic viene inviato al ciclo di eventi JavaScript e verificato se esiste un callback onclick registrato per gestire questo clic (eseguire B). Qualsiasi callback registrato viene eseguito con i dettagli dell'azione di clic.

Per Python, la libreria asyncio, utilizzata per fornire un ciclo di eventi, è stata aggiunta alla libreria standard. asyncio si concentra principalmente sul risolvere problemi nei servizi di rete, dove il ciclo di eventi viene fornito dalla disponibilità di I/O dal socket (socket) pronto per leggere e / o scrivere come “quando A accade” (attraverso il modulo selectors). Oltre al GUI e all'I/O, il ciclo di eventi viene spesso utilizzato per eseguire codice in altri thread o sotto-processi, utilizzando il ciclo di eventi come meccanismo di regolazione (ad esempio, multiprocessing cooperativo). Se capisci il GIL di Python, il ciclo di eventi è molto utile per quei luoghi che necessitano di rilasciare il GIL.

Thread e coroutines

Prima di tutto, guardiamo due porzioni di codice, una realizzata con il modulo threading e l'altra con il pacchetto asyncio.

# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # Questa classe definisce un oggetto mutabile utilizzato per controllare il thread dall'esterno
 go = True
def spin(msg, signal): # Questa funzione viene eseguita in un thread separato, il parametro signal è un'istanza della classe Signal definita in precedenza
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x08' * len(status)) # 使用退格符把光标移回行首
  time.sleep(0.1) # Aggiorna ogni 0.1 secondo
  if not signal.go: # Se l'attributo go non è True, esce dal ciclo
   break
 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头
def slow_function(): # Simula un'operazione che richiede molto tempo
 # 假装等待I/O一段时间
 time.sleep(3) # La chiamata a sleep blocca il thread principale, questo viene fatto per liberare il GIL e creare un thread secondario
 return 42
def supervisor(): # Questa funzione imposta il thread secondario, mostra l'oggetto thread, esegue il calcolo del tempo di esecuzione e infine uccide il processo
 signal = Signal()
 spinner = threading.Thread(target=spin,
        args=('thinking!', signal))
 print('oggetto spinner:', spinner) # Mostra l'oggetto thread spinner: <Thread(Thread-1, iniziale)>
 spinner.start() # Avvia il processo secondario
 result = slow_function() # Eseguiamo la funzione slow_function, bloccando il thread principale. Allo stesso tempo, il thread丛书 con un'animazione gira il puntatore
 signal.go = False
 spinner.join() # Attendere che il thread spinner finisca
 return result
def main():
 result = supervisor() 
 print('Answer', result)
if __name__ == '__main__':
 main()

Eseguiamo questo, il risultato sarà più o meno così:

Questo è un'animazione, la linea prima di 'thinking' è in movimento (per registrare lo schermo, ho aumentato il tempo di sleep)

Python non offre un'API per terminare i thread, quindi per chiudere un thread, è necessario inviare un messaggio al thread. Qui utilizziamo l'attributo signal.go: nel thread principale, lo impostiamo su False, quindi il thread spinner riceverà e uscirà

ora guardiamo la versione che utilizza il pacchetto asyncio:

# spinner_asyncio.py
# 通过协程以动画的形式显示文本式旋转指针
import asyncio
import itertools
import sys
@asyncio.coroutine # 打算交给asyncio 处理的协程要使用 @asyncio.coroutine 装饰
def spin(msg):
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x08' * len(status)) # 使用退格符把光标移回行首
  try:
   yield from asyncio.sleep(0.1) # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 这样的休眠不会阻塞事件循环
  except asyncio.CancelledError: # 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求
   break
 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头
@asyncio.coroutine
def slow_function(): # 5 现在此函数是协程,使用休眠假装进行I/O 操作时,使用 yield from 继续执行事件循环
 # 假装等待I/O一段时间
 yield from asyncio.sleep(3) # 此表达式把控制权交给主循环,在休眠结束后回复这个协程
 return 42
@asyncio.coroutine
def supervisor(): #这个函数也是协程,因此可以使用 yield from 驱动 slow_function
 spinner = asyncio.async(spin('thinking!')) # asyncio.async() 函数排定协程的运行时间,使用一个 Task 对象包装spin 协程,并立即返回
 print('oggetto spinner:', spinner) # Oggetto Task, output simile a spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
 # Guida la funzione slow_function(), ottieni il valore di ritorno dopo che è terminata. Allo stesso tempo, il ciclo degli eventi continua a eseguire
 # Poiché la funzione slow_function utilizza l'espressione yield from asyncio.sleep(3) per passare il controllo al ciclo principale
 result = yield from slow_function()
 # L'oggetto Task può essere cancellato; dopo la cancellazione, viene sollevata l'eccezione asyncio.CancelledError nel punto di yield corrente del coroutine
 # Il coroutine può catturare questa eccezione, può ritardare la cancellazione, o rifiutare la cancellazione
 spinner.cancel()
 return result
def main():
 loop = asyncio.get_event_loop() # Ottieni il riferimento al ciclo degli eventi
 # Guida il coroutine supervisor, fino a che non è completato; il valore di ritorno di questo coroutine è il valore di questa chiamata
 result = loop.run_until_complete(supervisor())
 loop.close()
 print('Answer', result)
if __name__ == '__main__':
 main()

A meno che non si voglia bloccare la thread principale, congelando il ciclo degli eventi o l'applicazione intera, non utilizzare time.sleep() nei coroutines di asyncio.

Se il coroutine deve fare nulla per un periodo di tempo, dovrebbe utilizzare yield from asyncio.sleep(DELAY)

L'uso del decoratore @asyncio.coroutine non è obbligatorio, ma è consigliato farlo perché può evidenziare i coroutines nel codice, se non ha prodotto un valore, il coroutine recupera la raccolta dei rifiuti (il che significa che l'operazione non è stata completata, potrebbe esserci un difetto) e può emettere un avviso. Questo decoratore non attiva il coroutine.

I risultati dell'esecuzione di questi due segmenti di codice sono fondamentalmente identici, ora guardiamo le principali differenze nel codice core supervisor:

  1. L'oggetto asyncio.Task è quasi equivalente all'oggetto threading.Thread (l'oggetto Task è come una green thread di un'implementazione di libreria di multitasking in scrittura
  2. L'oggetto Task viene utilizzato per guidare le iterazioni, mentre l'oggetto Thread viene utilizzato per chiamare gli oggetti invocabili
  3. L'oggetto Task non viene istanziato autonomamente, ma viene ottenuto passando l'iterazione alla funzione asyncio.async(...) o al metodo loop.create_task(...)
  4. L'oggetto Task ottenuto ha già pianificato il tempo di esecuzione; l'istanza di Thread deve chiamare il metodo start per informarla chiaramente di eseguire
  5. Nella versione di thread del funzione supervisor, slow_function è una funzione comune, chiamata direttamente dal thread, mentre la funzione slow_function della versione asincrona è una iterazione, guidata da yield from.
  6. Non esiste API che possa terminare un thread dall'esterno, perché i thread possono essere interrotti in qualsiasi momento. E se si desidera terminare un task, si può utilizzare il metodo di istanza Task.cancel(), lanciare un'eccezione CancelledError all'interno della iterazione. Le iterazioni possono catturare questa eccezione nel punto di yield sospeso e gestire la richiesta di terminazione
  7. Le iterazioni di supervisor devono essere eseguite nel metodo main con la funzione loop.run_until_complete.

Una delle principali vantaggi delle iterazioni rispetto ai thread è che i thread devono ricordare di mantenere il blocco di sicurezza, per proteggere le parti importanti del programma, prevenire che le operazioni multi-pass siano interrotte durante l'esecuzione e prevenire che l'acqua sia in uno stato di Xiaoguang. Le iterazioni di default faranno una buona protezione, dobbiamo esplicitamente produrre (usando yield o yield from per cedere il controllo) per far funzionare il resto del programma.

asyncio.Future: intenzionalmente non bloccante

L'interfaccia di asyncio.Future è基本上与 concurrent.futures.Future identica, ma il modo di implementazione è diverso e non può essere scambiata.

Nell'articolo precedente [python concurrency 1: utilizzo di futures per la gestione della concurrency]() abbiamo introdotto il futuro di concurrent.futures.Future, in concurrent.futures.Future, il futuro è solo il risultato della pianificazione dell'esecuzione di qualcosa. Nel pacchetto asyncio, il metodo BaseEventLoop.create_task(...) accetta un'iterazione, pianifica il suo tempo di esecuzione e poi restituisce un'istanza di asyncio.Task (che è anche un'istanza della classe asyncio.Future, poiché Task è un sottoclasse di Future, utilizzata per avvolgere iterazioni. (Nella versione di concurrent.futures.Future, l'operazione simile è Executor.submit(...)).

Simile alla classe concurrent.futures.Future, la classe asyncio.Future fornisce

  1. .done() restituisce un valore booleano che indica se il Future è stato eseguito
  2. Il metodo .add_done_callback() accetta un singolo parametro, un oggetto invocabile, e il Future chiama questo oggetto quando il Future è stato eseguito.
  3. Il metodo .result() non accetta parametri, quindi non è possibile specificare il tempo di scadenza. Se si chiama il metodo .result() e il periodo di tempo non è ancora stato eseguito, viene lanciata l'eccezione asyncio.InvalidStateError.

Quando si chiama result() sulla classe Future di concurrent.futures, dopo che il Future è stato eseguito, restituisce il risultato dell'oggetto invocabile o l'eccezione lanciata durante l'esecuzione dell'oggetto invocabile, se si chiama il metodo f.result() prima che il Future sia eseguito, il metodo blocca la thread chiamante fino a quando non viene restituito un risultato. In questo caso, il metodo result può accettare un parametro timeout, se il Future non viene eseguito entro il tempo specificato, viene lanciata l'eccezione TimeoutError.

Quando si utilizza asyncio.Future, di solito si utilizza yield from per ottenere i risultati, piuttosto che utilizzare il metodo result() Il'espressione yield from genera il valore di ritorno nella coroutine in pausa e riprende il processo di esecuzione.

L'obiettivo della classe asyncio.Future è di essere utilizzata insieme a yield from, quindi di solito non è necessario utilizzare i seguenti metodi:

  1. Non è necessario chiamare my_future.add_down_callback(...), poiché è possibile direttamente inserire le operazioni che si desidera eseguire alla fine dell'esecuzione del future nel corpo del coroutine dopo l'espressione yield from my_future. (Poiché le coroutine possono sospendere e riprendere la funzione)
  2. Non è necessario chiamare my_future.result(), poiché il risultato prodotto da yield from è (result = yield from my_future)

Nel pacchetto asyncio, è possibile utilizzare yield from per produrre risultati da un oggetto asyncio.Future. Questo significa che possiamo scrivere così:

res = yield from foo() # foo può essere una funzione coroutine, o una funzione comune che restituisce un'istanza di Future o task

Funzione asyncio.async(...)*

asyncio.async(coro_or_future, *, loop=None)

Questa funzione unifica le coroutine e i Future: il primo parametro può essere uno di entrambi. Se è un oggetto Future o Task, viene restituito direttamente, se è una coroutine, la funzione async chiama automaticamente il metodo loop.create_task(...) per creare un oggetto Task. Il parametro loop è opzionale e viene utilizzato per passare un loop di eventi; se non viene fornito, la funzione async recupera l'oggetto loop chiamando la funzione asyncio.get_event_loop().

BaseEventLoop.create_task(coro)

Questo metodo pianifica il tempo di esecuzione delle coroutines e restituisce un oggetto asyncio.Task. Se viene chiamato su una sottoclasse di BaseEventLoop personalizzata, l'oggetto restituito potrebbe essere un'istanza di qualche classe compatibile con Task di un'estensione esterna.

Il metodo BaseEventLoop.create_task() è disponibile solo a partire dalla versione Python 3.4.2. Python 3.3 può utilizzare solo la funzione asyncio.async(...).
Se si desidera sperimentare future e coroutines nel console di Python o in script di test di piccola dimensione, è possibile utilizzare il seguente segmento:

import asyncio
def run_sync(coro_or_future):
 loop = asyncio.get_event_loop()
 return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

Scaricare utilizzando asyncio e il pacchetto aiohttp

Ora che abbiamo compreso le basi di asyncio, è il momento di riscrivere lo script per scaricare la bandiera utilizzando asyncio, come fatto nel nostro articolo precedente [python concurrency 1: using futures to handle concurrency]()

Ecco un esempio di codice:

import asyncio
import aiohttp # È necessario installare aiohttp con pip
from flags import save_flag, show, main, BASE_URL
#asyncio.coroutine # Conosciamo che le coroutines dovrebbero essere decorate con asyncio.coroutine
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
  # Le operazioni bloccanti sono implementate tramite coroutine, il codice del cliente delega la responsabilità alle coroutine tramite yield from per eseguire operazioni asincrone
 resp = yield from aiohttp.request('GET', url) 
 # La lettura è anche un'operazione asincrona
 image = yield from resp.read()
 return image
@asyncio.coroutine
def download_one(cc): # Questa funzione deve essere una coroutine perché utilizza yield from
 image = yield from get_flag(cc) 
 show(cc)
 save_flag(image, cc.lower() + '.gif')
 return cc
def download_many(cc_list):
 loop = asyncio.get_event_loop() # Recupera il riferimento all'implementazione di basso livello del numero di eventi
 to_do = [download_one(cc) for cc in sorted(cc_list)] # Chiama download_one per ottenere ciascuna bandiera e costruisce una lista di oggetti generatori
 # Anche se il nome della funzione è wait, non è una funzione bloccante, wait è una coroutine che termina quando tutte le coroutine trasmesse a essa sono state eseguite
 wait_coro = asyncio.wait(to_do)
 res, _ = loop.run_until_complete(wait_coro) # Esegue il loop di eventi fino a che wait_coro non è completato; durante l'esecuzione del loop di eventi, questo script si blocca qui.
 loop.close() # Chiude il loop di eventi
 return len(res)
if __name__ == '__main__':
 main(download_many)

La descrizione sommaria del funzionamento di questo codice è la seguente:

  1. Nella funzione download_many, viene recuperato un loop di eventi per gestire diversi oggetti coroutine generati dalla chiamata alla funzione download_one
  2. Il loop di eventi asyncio attiva ciascuna coroutine una volta
  3. Quando il codice del cliente utilizza yield from per delegare la responsabilità alla coroutine della libreria (aiohttp.request) nel codice del cliente (get_flag), il controllo viene restituito al loop di eventi per eseguire la coroutine pianificata in precedenza
  4. Il loop di eventi riceve notifiche dopo che l'operazione bloccante è stata completata tramite un API di basso livello basato su callback.
  5. Dopo aver ricevuto la notifica, il ciclo principale invia i risultati alla coroutine in pausa
  6. La coroutine procede fino alla seguente espressione yield from, ad esempio get_flag(yield from resp.read()). Il loop di eventi recupera nuovamente il controllo, ripetendo i passaggi 4-6 fino a che il ciclo termina.

Nella funzione download_many, abbiamo utilizzato la funzione asyncio.wait(...), che è una coroutine, i parametri della coroutine sono un oggetto iterabile composto da future o coroutine; wait impacchetta ciascuna coroutine in un oggetto Task. Il risultato finale è che tutti gli oggetti elaborati da wait diventano istanze della classe Future.

wait è una funzione coroutine, quindi restituisce un oggetto coroutine o generator. L'oggetto variabile waite_coro contiene questo tipo di oggetto

Il metodo loop.run_until_complete riceve come parametro un future o una coroutine. Se è una coroutine, il metodo run_until_complete è uguale alla funzione wait, che impacchetta la coroutine in un oggetto Task. In questo caso, il metodo run_until_complete impacchetta wait_coro in un oggetto Task e lo guida tramite yield from. Dopo che wait_coro termina, restituisce due parametri: il primo è il future completato e il secondo è il future non completato.

<section class="caption">wait</section> ha due parametri nomeati, timeout e return_when, che possono restituire future non completate se impostati.

Hai anche notato che abbiamo riscritto la funzione get_flags, perché la libreria requests che usavamo prima eseguiva operazioni di I/O bloccanti. Per utilizzare il pacchetto asyncio, dobbiamo trasformare la funzione in una versione asincrona.

Trucchi

Se pensi che il codice diventi difficile da comprendere dopo l'uso delle coroutine, puoi seguire il consiglio del padre di Python (Guido van Rossum) e immaginare di non avere yield from.

Prendiamo questo pezzo di codice come esempio:

@asyncio.coroutine
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url) 
 image = yield from resp.read()
 return image
# Rimuovere yield from
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = aiohttp.request('GET', url) 
 image = resp.read()
 return image
# Ora tutto sembra più chiaro

Conoscenza

Quando si utilizza yield from nell'API del pacchetto asyncio, c'è un dettaglio da notare:

Quando si utilizza il pacchetto asyncio, il codice asincrono che scriviamo contiene coroutines (generatori delegati) alimentati dallo stesso asyncio, e alla fine delegano la responsabilità al pacchetto asyncio o a coroutine di terze parti. Questo metodo è equivalente a costruire un tubo che permette al ciclo di eventi asyncio di guidare l'esecuzione delle funzioni di I/O asincrone di basso livello.

Evitare le chiamate bloccanti

Prima di tutto, guardiamo un grafico che mostra la latenza di lettura dei dati dal computer da diversi mezzi di archiviazione:

Da questa immagine, possiamo vedere che le chiamate bloccanti sono un'enorme perdita di risorse per il CPU. Come possiamo evitare che le chiamate bloccanti fermino l'intero applicativo?

Ci sono due metodi:

  1. Le operazioni bloccanti vengono eseguite in un thread separato
  2. Convertire ogni operazione bloccante in chiamate asincrone non bloccanti

Naturalmente, raccomandiamo la seconda soluzione, perché la prima è troppo costosa se ogni connessione utilizza un thread.

Un altro modo per implementare la programmazione asincrona è utilizzare generatori come coroutines. Per il ciclo di eventi, chiamare un callback è quasi lo stesso che chiamare .send() su una coroutine in pausa. Le coroutine in pausa consumano molto meno memoria rispetto ai thread.

Ora, dovresti capire perché lo script flags_asyncio.py è molto più veloce di flags.py.

Poiché flags.py scarica in modo sincrono sequenziale, ogni download richiede decine di miliardi di cicli di CPU in attesa del risultato. In flags_asyncio.py, quando si chiama il metodo loop.run_until_complete nel metodo download_many, il ciclo di eventi guida i vari coroutines di download_one, fino a quando l'espressione yield from, che a sua volta guida i vari coroutines di get_flag, fino al primo espressione yield from, si chiama la funzione aiohttp.request(). Queste chiamate non bloccano, quindi in meno di un secondo tutte le richieste possono iniziare.

Miglioramento dello script di download asyncio

Ora miglioriamo flags_asyncio.py, aggiungendo la gestione delle eccezioni e il contatore

import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
# Eccezione personalizzata per impacchettare altre eccezioni HTTP o di rete, e ottenere country_code per segnalare gli errori
class FetchError(Exception):
 def __init__(self, country_code):
  self.country_code = country_code
@asyncio.coroutine
def get_flag(cc):
 # Questa coroutine ha tre tipi di risultati di ritorno:
 # 1. Restituisce l'immagine scaricata
 # 2. Lancia l'eccezione web.HTTPNotFound quando la risposta HTTP è 404
 # 3. Lancia aiohttp.HttpProcessingError quando viene restituito un altro codice di stato HTTP
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url)
 se resp.status == 200:
  image = yield from resp.read()
  return image
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers
  )
@asyncio.coroutine
def download_one(cc, semaphore):
 # Il parametro semaphore è un'istanza della classe asyncio.Semaphore
 # La classe Semaphore è un dispositivo di sincronizzazione utilizzato per limitare le richieste concorrenti
 try:
  with (yield from semaphore):
    # Nell'espressione yield from, l'oggetto semaphore viene utilizzato come gestione contesto per evitare di bloccare l'intero sistema
    # Se il valore del contatore semaphore è il valore massimo permesso, solo questa coroutine verrà bloccata
    image = yield from get_flag(cc)
    # Dopo l'uscita dalla frase with, il valore del contatore semaphore si ridurrà
    # La sbloccatura potrebbe essere necessaria mentre altri oggetti coroutine in attesa stanno aspettando lo stesso oggetto semaphore
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  save_flag(image, cc.lower() + '.gif')
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)
@asyncio.coroutine
def downloader_coro(cc_list):
 counter = collections.Counter()
 # Creazione di un'istanza di asyncio.Semaphore che permette di attivare al massimo MAX_CONCUR_REQ coroutine che utilizzano questo contatore
 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
 # Chiamata ripetuta a download_one coroutine, creazione di una lista di oggetti coroutine
 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
 # Ottenere un iteratore che restituisce future una volta completato il future
 to_do_iter = asyncio.as_completed(to_do)
 for future in to_do_iter:
  # Iterazione dei future che possono essere terminati 
  try:
   res = yield from future # Ottenere il risultato dell'oggetto asyncio.Future (può anche chiamare future.result)
  except FetchError as exc:
   # Tutte le eccezioni sollevate vengono racchiuse nell'oggetto FetchError
   country_code = exc.country_code
   try:
    # Tentare di ottenere il messaggio di errore dall'eccezione originale (__cause__)
    error_msg = exc.__cause__.args[0]
   except IndexError:
    # Se non si trova il messaggio di errore nell'eccezione originale, utilizzare il nome della classe dell'eccezione connessa come messaggio di errore
    error_msg = exc.__cause__.__class__.__name__
   if error_msg:
    msg = '*** Error for {}: {}'
    print(msg.format(country_code, error_msg))
   status = HTTPStatus.error
  else:
   status = res.status
  counter[status] += 1
 return counter
def download_many(cc_list):
 loop = asyncio.get_event_loop()
 coro = downloader_coro(cc_list)
 counts = loop.run_until_complete(coro)
 loop.close()
 return counts
if __name__ == '__main__':
 main(download_many)

Poiché le coroutine iniziano richieste a velocità rapida, per prevenire un numero eccessivo di richieste concorrenti al server che potrebbe sovraccaricare il server, nella funzione download_coro creiamo un'istanza di asyncio.Semaphore e la passiamo alla funzione download_one.

<secion class="caption">Semaphore</section> oggetto mantiene un contatore interno, se si chiama il metodo .acquire() coroutine sull'oggetto, il contatore si riduce; se si chiama il metodo .release() coroutine sull'oggetto, il contatore si incrementa. Il valore del contatore è impostato durante l'inizializzazione.

Se il contatore è maggiore di 0, la chiamata al metodo .acquire() non blocca, se il contatore è 0, il metodo .acquire() blocca la coroutines che lo chiamano fino a quando altre coroutines chiamano il metodo .release() sullo stesso oggetto Semaphore, incrementando il contatore.

Nel codice superiore, non abbiamo chiamato manualmente i metodi .acquire() o .release(), ma abbiamo utilizzato il semaphore come gestore di contesto all'interno della funzione download_one:

with (yield from semaphore):
 image = yield from get_flag(cc)

Questo codice garantisce che non ci siano mai più di MAX_CONCUR_REQ coroutine get_flag in esecuzione.

Usare la funzione asyncio.as_completed

Poiché è necessario utilizzare yield from per ottenere il risultato del future prodotto dalla funzione asyncio.as_completed, la funzione as_completed deve essere chiamata all'interno della coroutine. Poiché download_many deve essere passato come parametro alla funzione main non coroutine, ho aggiunto una nuova coroutine downloader_coro, in modo che la funzione download_many venga utilizzata solo per configurare l'anello di eventi.

Usare l'oggetto Executor per prevenire i blocchi dell'anello di eventi

Torniamo a guardare la grafica della latenza di lettura del computer da diversi supporti di archiviazione, c'è una cosa da notare in tempo reale, ovvero l'accesso al file system locale può bloccare.

Nel codice precedente, la funzione save_flag blocca il thread condiviso tra il codice del cliente e l'anello di eventi asyncio, quindi quando si salvano i file, l'intero applicativo si ferma. Per evitare questo problema, si può utilizzare il metodo run_in_executor dell'oggetto dell'anello di eventi.

L'anello di eventi asyncio mantiene in background un oggetto ThreadPoolExecutor, possiamo chiamare il metodo run_in_executor per inviare l'oggetto invocabile a esso per l'esecuzione.

Di seguito è riportato il codice modificato:

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  # Ecco la parte modificata
  loop = asyncio.get_event_loop() # Ottieni il riferimento all'anello di eventi
  loop.run_in_executor(None, save_flag, image, cc.lower() + '.gif')
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

Il primo parametro del metodo run_in_executor è l'istanza di Executor; se impostato su None, viene utilizzato l'istanza predefinita ThreadPoolExecutor dell'anello di eventi.

Dalla callback alla future alla coroutine

Prima di toccare le coroutines, potremmo avere una certa conoscenza delle callback, allora cosa migliora la coroutine rispetto alle callback?

stile di codice di callback in Python:

def stage1(response1):
 request2 = step1(response1)
 api_call2(request2, stage2)
def stage2(response2):
 request3 = step3(response3)
 api_call3(request3, stage3) 
 def stage3(response3):
  step3(response3) 
api_call1(request1, stage1)

I difetti del codice sopra:

  1. È facile entrare in un inferno di callback
  2. Il codice è difficile da leggere

In questo problema, le coroutines possono fare una grande differenza. Se si sostituisce il codice asincrono fatto con coroutines e yield from, l'esempio di codice è il seguente:

@asyncio.coroutine
def three_stages(request1):
 response1 = yield from api_call1(request1)
 request2 = step1(response1)
 response2 = yield from api_call2(requests)
 request3 = step2(response2)
 response3 = yield from api_call3(requests)
 step3(response3) 
loop.create_task(three_stages(request1)

Confrontato con il codice precedente, questo codice è molto più facile da comprendere. Se la chiamata asincrona api_call1, api_call2, api_call3 lancia un'eccezione, allora è possibile mettere le espressioni yield from corrispondenti nel blocco try/except per gestire l'eccezione.

È necessario abituarsi all'espressione yield from quando si usano le coroutines, e le coroutines non possono essere chiamate direttamente; devono essere programmate esplicitamente il tempo di esecuzione, o essere attivate utilizzando l'espressione yield from in altre coroutines programmate per il tempo di esecuzione. Se non si utilizza loop.create_task(three_stages(request1)), non accadrà nulla.

Di seguito, mostriamo un esempio pratico:

Ogni volta che si scarica, vengono inviate più richieste

Modifichiamo il codice per scaricare la bandiera, in modo che contemporaneamente possiamo ottenere il nome della nazione e usarlo nel salvataggio dell'immagine.
Usiamo le coroutines e yield from per risolvere questo problema:

@asyncio.coroutine
def http_get(url):
 resp = yield from aiohttp.request('GET', url)
 se resp.status == 200:
  ctype = resp.headers.get('Content-type', '').lower()
  se 'json' è presente in ctype o url.endswith('json'):
   data = yield from resp.json()
  else:
   data = yield from resp.read()
  return data
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers)
@asyncio.coroutine
def get_country(cc):
 url = "{}/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
 metadata = yield from http_get(url)
 return metadata['country']
@asyncio.coroutine
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 return (yield from http_get(url))
@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
  with (yield from semaphore):
   country = yield from get_country(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  country = country.replace(' ', '_')
  filename = '{}--{}.gif'.format(country, cc)
  print(filename)
  loop = asyncio.get_event_loop()
  loop.run_in_executor(None, save_flag, image, filename)
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

In questo pezzo di codice, chiamiamo get_flag e get_country all'interno di due blocchi with controllati da semaphore nel download_one funzione per risparmiare tempo.

Il return della statement get_flag viene aggiunto all'esterno degli apici perché il precedence dell'operatore () è alto, esegue prima la statement yield from all'interno degli apici. Senza aggiungere () si genererà un errore sintattico

Aggiungere () è equivalente a

image = yield from http_get(url)
return image

Se non si aggiunge (), il programma si interromperà al punto di yield from, cedendo il controllo, a questo punto l'uso di return genererà un errore sintattico.

Conclusione

In questo articolo, abbiamo discusso:

  1. Confronta un programma multithreading e una versione asyncio, spiegando la relazione tra threading e task asincroni
  2. Confronta le differenze tra la classe asyncio.Future e la classe concurrent.futures.Future
  3. Come utilizzare la programmazione asincrona per gestire la alta concorrenza nelle applicazioni di rete
  4. In programmazione asincrona, rispetto ai callback, le coroutines migliorano significativamente le prestazioni

Questo è tutto il contenuto dell'articolo, speriamo che sia utile per la tua apprendimento, e speriamo che tutti supportino e gridino le lezioni.

Dichiarazione: il contenuto di questo articolo è stato tratto da Internet, il copyright spetta ai rispettivi autori, il contenuto è stato contribuito e caricato volontariamente dagli utenti di Internet, questo sito non detiene il diritto di proprietà, non è stato editato manualmente e non assume responsabilità per le relative responsabilità legali. Se trovi contenuti sospetti di violazione del copyright, sei invitato a inviare una email a: notice#oldtoolbag.com (al momento dell'invio dell'email, sostituisci # con @) per segnalare il problema e fornire prove pertinenti. Una volta verificata, questo sito eliminerà immediatamente il contenuto sospetto di violazione del copyright.

Ti potrebbe interessare