English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Contesto tecnico della pool di thread
Nella programmazione orientata agli oggetti, la creazione e la distruzione degli oggetti sono molto耗时的,perché per creare un oggetto è necessario ottenere risorse di memoria o altre risorse. Questo è ancora più vero in Java, dove il virtual machine cercherà di tracciare ogni oggetto per poter eseguire la raccolta dei rifiuti dopo la distruzione dell'oggetto.
所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些”池化资源”技术产生的原因。
例如Android中常见到的很多通用组件一般都离不开”池”的概念,如各种图片加载库,网络请求库,即使Android的消息传递机制中的Meaasge当使用Meaasge.obtain()就是使用的Meaasge池中的对象,因此这个概念很重要。本文将介绍的线程池技术同样符合这一思想。
线程池的优点:
1.重用线程池中的线程,减少因对象创建,销毁所带来的性能开销;
2.能有效的控制线程的最大并发数,提高系统资源利用率,同时避免过多的资源竞争,避免堵塞;
3.能够多线程进行简单的管理,使线程的使用简单、高效。
线程池框架Executor
java中的线程池是通过Executor框架实现的,Executor 框架包括类:Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask的使用等。
Executor: 所有线程池的接口,只有一个方法。
public interface Executor { void execute(Runnable command); }
ExecutorService: 增加Executor的行为,是Executor实现类的最直接接口。
Executors:提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService 接口。
ThreadPoolExecutor:线程池的具体实现类,一般用的各种线程池都是基于这个类实现的。构造方法如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
corePoolSize: rappresenta il numero di thread di base della pool di thread, il numero di thread in esecuzione nella pool di thread non supererà mai corePoolSize, di default può rimanere attivo per sempre. È possibile impostare allowCoreThreadTimeOut a True, in questo caso il numero di thread di base è 0, a quel punto keepAliveTime controlla il tempo di scadenza di tutti i thread.
maximumPoolSize: rappresenta il numero massimo di thread che la pool di thread può permettere;
keepAliveTime: indica il tempo di scadenza per la fine di un thread inattivo;
unit: è un'enumerazione che rappresenta l'unità di keepAliveTime;
workQueue: rappresenta la coda di BlokingQueue<Runnable> per memorizzare le attività.
BlockingQueue: La coda bloccante (BlockingQueue) è uno strumento principale sotto java.util.concurrent utilizzato per controllare la sincronizzazione tra thread. Se la BlockQueue è vuota, l'operazione di prelievo da BlockingQueue viene bloccata in stato di attesa fino a che la BlockingQueue non riceve qualcosa, a quel punto verrà svegliata. Allo stesso modo, se la BlockingQueue è piena, qualsiasi tentativo di inserimento nella coda viene bloccato in stato di attesa fino a che la BlockingQueue ha spazio, a quel punto verrà svegliata per continuare l'operazione. La coda bloccante viene spesso utilizzata nelle scenari di produttore e consumatore, dove il produttore è il thread che aggiunge elementi alla coda e il consumatore è il thread che preleva elementi dalla coda. La coda bloccante è il contenitore in cui il produttore deposita gli elementi, e il consumatore preleva solo gli elementi dal contenitore. Esempi di implementazioni specifiche includono LinkedBlockingQueue, ArrayBlockingQueued ecc. Di solito, l'interno utilizza Lock e Condition (lo studio e l'uso di Lock e Condition) per implementare il blocco e il risveglio.
Il processo di lavoro della pool di thread è il seguente:
La pool di thread viene creata senza thread. La coda delle attività è passata come parametro. Tuttavia, anche se la coda contiene attività, la pool di thread non eseguirà immediatamente queste attività.
Quando si chiama il metodo execute() per aggiungere un'attività, la pool di thread esegue le seguenti verifiche:
Se il numero di thread in esecuzione è minore di corePoolSize, verrà creato immediatamente un thread per eseguire questa attività;
Se il numero di thread in esecuzione è maggiore o uguale a corePoolSize, questa attività verrà messa in coda;
Se in questo momento la coda è piena e il numero di thread in esecuzione è inferiore a maximumPoolSize, comunque, viene creato un thread non di base per eseguire immediatamente questa task;
Se la coda è piena e il numero di thread in esecuzione è maggiore o uguale a maximumPoolSize, il pool di thread lancia un'eccezione RejectExecutionException.
Quando un thread completa una task, prenderà la prossima task dalla coda per eseguirla.
Quando un thread non ha nulla da fare per un periodo di tempo superiore a una certa soglia (keepAliveTime), il pool di thread lo giudica, se il numero di thread in esecuzione è maggiore di corePoolSize, allora questo thread viene fermato. Pertanto, una volta completate tutte le task del pool di thread, esso si riduce alla dimensione di corePoolSize.
Creazione e utilizzo del pool di thread
La creazione del pool di thread utilizza i metodi statici della classe di utilità Executors, di seguito sono elencati alcuni tipi comuni di pool di thread.
SingleThreadExecutor: singolo thread di background (la sua coda di buffering è illimitata)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Creare un pool di thread con un solo thread. Questo pool di thread ha solo un thread di base che lavora, che è equivalente a eseguire tutte le task in serie con un singolo thread. Se l'unico thread termina a causa di un'eccezione, verrà sostituito da un nuovo thread. Questo pool di thread garantisce che tutte le task vengano eseguite nell'ordine di submission delle task.
FixedThreadPool: pool di thread con solo thread di base, dimensione fissa (la sua coda di buffering è illimitata).
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Creare un pool di thread di dimensione fissa. Ogni volta che si invia un task viene creato un thread, fino a quando il numero di thread raggiunge la dimensione massima del pool. Una volta raggiunta la dimensione massima, il pool di thread mantiene la dimensione costante. Se un thread termina a causa di un'eccezione di esecuzione, il pool di thread aggiunge un nuovo thread.
CachedThreadPool:无界线程池,可以进行自动线程回收。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。SynchronousQueue是一个缓冲区为1的阻塞队列。
ScheduledThreadPool:核心线程池固定,大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
public static ExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPool(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。
线程池最常用的提交任务的方法有两种:
execute:
ExecutorService.execute(Runnable runable);
submit:
FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable, T Result);
FutureTask<T> task = ExecutorService.submit(Callable<T> callable);
submit(Callable callable)的实现,submit(Runnable runnable)同理。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); FutureTask<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
可以看出submit开启的是有返回结果的任务,会返回一个FutureTask对象,这样就能通过get()方法得到结果。submit最终调用的也是execute(Runnable runable),submit只是将Callable对象或Runnable封装成一个FutureTask对象,因为FutureTask是个Runnable,所以可以在execute中执行。关于Callable对象和Runnable怎么封装成FutureTask对象,见Callable和Future、FutureTask的使用。
线程池实现的原理
如果只讲线程池的使用,那这篇博客没有什么大的价值,充其量也就是熟悉Executor相关API的过程。线程池的实现过程没有用到Synchronized关键字,用的都是Volatile,Lock和同步(阻塞)队列,Atomic相关类,FutureTask等等,因为后者的性能更优。理解的过程可以很好的学习源码中并发控制的思想。
在开篇提到过线程池的优点是可总结为以下三点:
线程复用
控制最大并发数
管理线程
1.线程复用过程
理解线程复用原理首先应了解线程生命周期。
在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。
Thread通过new来新建一个线程,这个过程是初始化一些线程信息,如线程名,id,线程所属group等,可以认为只是个普通的对象。调用Thread的start()后Java虚拟机会为其创建方法调用栈和程序计数器,同时将hasBeenStarted为true,之后调用start方法就会有异常。
Le thread in questo stato non sono ancora in esecuzione, ma rappresentano solo che la thread può essere eseguita. Quando inizia l'esecuzione della thread dipende dallo scheduler dei thread nel JVM. Quando la thread ottiene il cpu, viene chiamato il metodo run(). Non chiamare mai il metodo run() della Thread. Dopo la gestione della CPU, viene effettuato lo switch tra pronto, in esecuzione e bloccato, fino a quando il metodo run() termina o la thread viene fermata in un altro modo, entrando nello stato dead.
Quindi, il principio di realizzazione della riutilizzazione dei thread dovrebbe essere mantenere lo stato della thread in vita (pronto, in esecuzione o bloccato). Ora vediamo come il ThreadPoolExecutor realizza la riutilizzazione dei thread.
Nel ThreadPoolExecutor, la classe Worker principale viene utilizzata per controllare la riutilizzazione dei thread. Guardiamo il codice semplificato della classe Worker, in modo da facilitarne la comprensione:
private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null){ task.run(); } }
Worker è un Runnable che possiede anche un thread, il thread che deve essere avviato. Quando si crea un oggetto Worker, si crea anche un oggetto Thread, e si passa Worker come parametro a TThread. In questo modo, quando viene chiamato il metodo start() del Thread, viene eseguito il metodo run() di Worker, poi si passa a runWorker(). C'è un ciclo while che continua a ottenere l'oggetto Runnable da getTask(), eseguendo in ordine. Come ottiene getTask() l'oggetto Runnable?
依旧是简化后的代码:
private Runnable getTask() { if(一些特殊情况) { return null; } Runnable r = workQueue.take(); return r; }
这个workQueue就是初始化ThreadPoolExecutor时存放任务的BlockingQueue队列,这个队列里的存放的都是将要执行的Runnable任务。因为BlockingQueue是个阻塞队列,BlockingQueue.take()得到如果是空,则进入等待状态直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。所以一般情况Thread的run()方法就不会结束,而是不断执行从workQueue里的Runnable任务,这就达到了线程复用的原理了。
2.控制最大并发数
那Runnable是什么时候放入workQueue?Worker又是什么时候创建,Worker里的Thread的又是什么时候调用start()开启新线程来执行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?
很容易想到是在execute(Runnable runnable)时会做上面的一些任务。看下execute里是怎么做的。
execute:
Codice semplificato
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 当前线程数 < corePoolSize if (workerCountOf(c) < corePoolSize) { // Avvia direttamente un nuovo thread. if (addWorker(command, true)) return; c = ctl.get(); } // Numero di thread attivi >= corePoolSize // runState è RUNNING e la coda non è piena if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // Verifica di nuovo lo stato RUNNING // Se non è in stato RUNNING, rimuovi l'attività dalla workQueue e rifiuta if (!isRunning(recheck) && remove(command)) reject(command); // Rifiuta l'attività secondo la strategia specificata dal pool di thread // Due situazioni: // Rifiuto di nuove attività in stato non RUNNING // Fallimento dell'avvio di un nuovo thread quando la coda è piena (workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }
addWorker:
Codice semplificato
private boolean addWorker(Runnable firstTask, boolean core) { int wc = workerCountOf(c); if (wc >= (core ? corePoolSize : maximumPoolSize)) { return false; } w = new Worker(firstTask); final Thread t = w.thread; t.start(); }
Secondo il codice, guardiamo la situazione di aggiunta di attività nel processo di lavoro del pool di thread menzionato sopra:
* Se il numero di thread in esecuzione è minore di corePoolSize, crea immediatamente un thread per eseguire questa attività;
* Se il numero di thread in esecuzione è maggiore o uguale a corePoolSize, metti questa attività nella coda;
* Se in questo momento la coda è piena e il numero di thread in esecuzione è minore di maximumPoolSize, comunque, è necessario creare un thread non di base per eseguire immediatamente questa attività;
* Se la coda è piena e il numero di thread in esecuzione è maggiore o uguale a maximumPoolSize, il pool di thread lancia un'eccezione RejectExecutionException.
Questo è il motivo per cui l'AsyncTask di Android lancia RejectExecutionException quando si esegue in parallelo e supera il numero massimo di compiti, dettagliati nella lettura del codice sorgente di AsyncTask della versione più recente e nel lato oscuro di AsyncTask.
Se addWorker ha successo nella creazione di una nuova thread, allora attraverso start() si avvia la nuova thread, e firstTask viene eseguita come prima attività nel run() di questo Worker.
Nonostante ogni Worker elabori i compiti in modo sequenziale, se vengono creati più Worker, poiché condividono lo stesso workQueue, verranno elaborati in modo parallelo.
Quindi, controllare il numero massimo di concorrenza in base a corePoolSize e maximumPoolSize. Il processo può essere rappresentato con la seguente immagine.
La spiegazione e l'immagine sopra possono aiutare a comprendere questo processo in modo chiaro.
Se si è impegnati nello sviluppo Android e si è abbastanza familiari con il principio di Handler, potresti trovare questa immagine piuttosto familiare, alcuni processi e Handler, Looper, Message sono molto simili. Handler.send(Message) è equivalente a execute(Runnable), la coda di Message mantenuta da Looper è equivalente a BlockingQueue, ma è necessario mantenere questa coda attraverso la sincronizzazione, il ciclo loop() di Looper esegue in modo ciclico dalla coda di Message e il runWork() di Worker continua a prendere Runnable dalla BlockingQueue con lo stesso principio.
3. Gestione delle thread
Grazie al thread pool è possibile gestire in modo efficace la riutilizzazione delle thread, controllare il numero di concorrenza e altri processi di distruzione, mentre la riutilizzazione e il controllo della concorrenza sono stati trattati, e il processo di gestione è stato intercalato, e anche è facilmente comprensibile.
In ThreadPoolExecutor c'è una variabile AtomicInteger chiamata ctl. Attraverso questa variabile vengono salvati due contenuti:
Il numero totale delle thread, lo stato in cui si trovano ogni thread, tra cui i 29 bit inferiori contengono il numero di thread, e i 3 bit superiori contengono il runState, per ottenere diversi valori tramite operazioni bitwise.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Ottenere lo stato della thread private static int runStateOf(int c) { return c & ~CAPACITY; } // Ottieni il numero di Worker private static int workerCountOf(int c) { return c & CAPACITY; } // Giudizio se il thread è in esecuzione private static boolean isRunning(int c) { return c < SHUTDOWN; }
Questo passaggio analizza il processo di chiusura del pool di thread attraverso shutdown e shutdownNow(). In primo luogo, il pool di thread ha cinque stati per controllare l'aggiunta e l'esecuzione dei task. Introduco principalmente i seguenti tre:
Stato RUNNING: Il pool di thread funziona regolarmente, può accettare nuovi task e gestire le task nella coda;
Stato SHUTDOWN: Non accetta nuovi task, ma esegue le task nella coda;
Stato STOP: Non accetta nuovi task, non gestisce le task nella coda; il metodo shutdown imposta runState a SHUTDOWN, termina tutti i thread inattivi, mentre i thread che stanno lavorando non sono influenzati, quindi le task nella coda verranno eseguite.
Il metodo shutdownNow imposta runState a STOP. La differenza rispetto al metodo shutdown è che questo metodo termina tutti i thread, quindi le task nella coda non verranno eseguite.
Conclusione
Analizzando il codice sorgente di ThreadPoolExecutor, ho compreso globalmente il processo di creazione del pool di thread, l'aggiunta di task e l'esecuzione, familiarizzare con questi processi rende l'uso del pool di thread più facile.
E alcune delle esperienze acquisite durante la lezione possono essere molto utili per capire o risolvere altri problemi in futuro. Ad esempio, il meccanismo di Handler in Android, e l'uso della coda Messager in Looper con una BlookQueue per gestire può essere altrettanto efficace, questo è il frutto della lettura del codice sorgente.
Ecco la raccolta di materiali relativi al pool di thread Java, continueremo a integrare ulteriori materiali in futuro, grazie per il supporto a questo sito!