線程池
通過建立池可以有效的利用系統資源,節約系統性能。Java 中的線程池就是一種非常好的實現,從 JDK1.5 開始 Java 提供了一個線程工廠 Executors 用來生成線程池,通過 Executors 可以方便的生成不同類型的線程池。
線程池的優點
- 降低資源消耗。線程的開啟和銷毀會消耗資源,通過重複利用已創建的線程降低線程創建和銷毀造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
常見的線程池
- CachedThreadPool:可緩存的線程池,該線程池中沒有核心線程,非核心線程的數量為 Integer.max_value,就是無限大,當有需要時創建線程來執行任務,沒有需要時回收線程,適用於耗時少,任務量大的情況。
- SecudleThreadPool:周期性執行任務的線程池,按照某種特定的計劃執行線程中的任務,有核心線程,但也有非核心線程,非核心線程的大小也為無限大。適用於執行周期性的任務。
- SingleThreadPool:只有一條線程來執行任務,適用於有順序的任務的應用場景。
- FixedThreadPool:定長的線程池,有核心線程,核心線程的即為最大的線程數量,沒有非核心線程
- Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor() 和 Executors.newCachedThreadPool() 等方法的底層都是通過 ThreadPoolExecutor 實現的。
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
// maximumPoolSize 必須大於 0,且必須大於 corePoolSize
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
參數介紹:
-
corePoolSize
- 線程池的核心線程數。在沒有設置 allowCoreThreadTimeOut 為 true 的情況下,核心線程會在線程池中一直存活,即使處於閑置狀態。
- 如果設置為 0,則表示在沒有任何任務時,銷毀線程池;如果大於 0,即使沒有任務時也會保證線程池的線程數量等於此值。
- 但需要注意,此值如果設置的比較小,則會頻繁的創建和銷毀線程,如果設置的比較大,則會浪費系統資源,所以需要根據自己的實際業務來調整此值。
-
maximumPoolSize
- 線程池所能容納的最大線程數。當活動線程(核心線程+非核心線程)達到這個數值后,後續任務將會根據 RejectedExecutionHandler 來進行拒絕策略處理。
- 官方規定此值必須大於 0,也必須大於等於 corePoolSize,此值只有在任務比較多,且不能存放在任務隊列時,才會用到。
-
keepAliveTime
- 非核心線程閑置時的超時時長。超過該時長,非核心線程就會被回收。
- 若線程池通過 allowCoreThreadTimeOut() 方法設置 allowCoreThreadTimeOut 屬性為 true,則該時長同樣會作用於核心線程,AsyncTask 配置的線程池就是這樣設置的。
-
unit
- keepAliveTime 時長對應的單位。
-
workQueue
- 表示線程池執行的任務隊列,當線程池的所有線程都在處理任務時,如果來了新任務就會緩存到此任務隊列中排隊等待執行。
- 是一個阻塞隊列 BlockingQueue,雖然它是 Queue 的子接口,但是它的主要作用並不是容器,而是作為線程同步的工具,他有一個特徵,當生產者試圖向 BlockingQueue 放入(put)元素,如果隊列已滿,則該線程被阻塞;當消費者試圖從 BlockingQueue 取出(take)元素,如果隊列已空,則該線程被阻塞。
-
ThreadFactory
- 線程的創建工廠,功能很簡單,就是為線程池提供創建新線程的功能。
- 也可以自定義一個線程工廠,通過實現 ThreadFactory 接口來完成,這樣就可以自定義線程的名稱或線程執行的優先級了。
- 通常在創建線程池時不指定此參數,它會使用默認的線程創建工廠的方法來創建線程,源代碼如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { // Executors.defaultThreadFactory() 為默認的線程創建工廠 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } // 默認的線程創建工廠,需要實現 ThreadFactory 接口 static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } // 創建線程 public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); // 創建一個非守護線程 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); // 線程優先級設置為默認值 return t; } } -
RejectedExecutionHandler
- 表示指定線程池的拒絕策略,當線程池的任務已經在緩存隊列 workQueue 中存儲滿了之後,並且不能創建新的線程來執行此任務時,就會用到此拒絕策略.
- 它屬於一種限流保護的機制,這裡有四種任務拒絕類型:
- AbortPolicy: 不執行新任務,直接拋出異常,提示線程池已滿,涉及到該異常的任務也不會被執行,線程池默認的拒絕策略就是該策略。
- DisCardPolicy: 不執行新任務,也不拋出異常,即忽略此任務;
- DisCardOldSetPolicy: 將消息隊列中的第一個任務(即等待時間最久的任務)替換為當前新進來的任務執行,忽略最早的任務(最先加入隊列的任務);
- CallerRunsPolicy: 把任務交給當前線程來執行;
/** * 線程池的拒絕策略 */ @Test public void test1() { // 創建線程池 核心線程為1,最大線程為3,任務隊列大小為2 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 3, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2), new ThreadPoolExecutor.AbortPolicy() // 添加 AbortPolicy 拒絕策略 ); for (int i = 0; i < 6; i++) { poolExecutor.execute(() -> { System.out.println(Thread.currentThread().getName()); }); } }- 自定義線程池拒絕策略
/** * 自定義線程池的拒絕策略 * 實現接口 RejectedExecutionHandler */ @Test public void test2() { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 業務處理方法 System.out.println("執行自定義拒絕策略"); } } ); for (int i = 0; i < 6; i++) { executor.execute(() -> { System.out.println(Thread.currentThread().getName()); }); } }
線程池工作原理
線程池的工作流程要從它的執行方法 execute() 說起,源碼如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 當前工作的線程數小於核心線程數
if (workerCountOf(c) < corePoolSize) {
// 創建新的線程執行此任務
if (addWorker(command, true))
return;
c = ctl.get();
}
// 檢查線程池是否處於運行狀態,如果是則把任務添加到隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再出檢查線程池是否處於運行狀態,防止在第一次校驗通過後線程池關閉
// 如果是非運行狀態,則將剛加入隊列的任務移除
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池的線程數為 0 時(當 corePoolSize 設置為 0 時會發生)
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 新建線程執行任務
}
// 核心線程都在忙且隊列都已爆滿,嘗試新啟動一個線程執行失敗
else if (!addWorker(command, false))
// 執行拒絕策略
reject(command);
}
execute() VS submit()
- execute() 和 submit() 都是用來執行線程池任務的,它們最主要的區別是,submit() 方法可以接收線程池執行的返回值,而 execute() 不能接收返回值。
- sumbit 之所以可以接收返回值,是因為參數中可以傳遞:Callable task,而通過 callable 創建的線程任務有返回值並且可以拋出異常。
/**
* execute VS sumbin
* execute 提交任務沒有返回值
* submit 提交任務有返回值
*/
@Test
public void test3() throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20));
// execute
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("Hello, execute");
}
});
// submit 使用
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Hello, submit");
return "submit success";
}
});
System.out.println(future.get());
}
- 它們的另一個區別是 execute() 方法屬於 Executor 接口的方法,而 submit() 方法則是屬於 ExecutorService 接口的方法。
線程池的使用:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author xiandongxie
*/
public class ThreadPool {
//參數初始化 返回Java虛擬機可用的處理器數量
// private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CPU_COUNT = 2;
//核心線程數量大小
private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//線程池最大容納線程數
private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
//線程空閑后的存活時長
private static final int keepAliveTime = 30;
//任務過多后,存儲任務的一個阻塞隊列
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
//線程的創建工廠
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AdvacnedAsyncTask #" + mCount.getAndIncrement());
}
};
//線程池任務滿載后採取的任務拒絕策略: 不執行新任務,直接拋出異常,提示線程池已滿
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
//線程池對象,創建線程
ThreadPoolExecutor mExecute = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectHandler
);
public static void main(String[] args) {
System.out.println("main start ..... \nCPU_COUNT = " + CPU_COUNT + "\tcorePoolSize=" + corePoolSize + "\tmaximumPoolSize=" + maximumPoolSize);
ThreadPool threadPool = new ThreadPool();
ThreadPoolExecutor execute = threadPool.mExecute;
// 預啟動所有核心線程
execute.prestartAllCoreThreads();
for (int i = 0; i < 5; i++) {
execute.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "\tstart..." + System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\tend..." + System.currentTimeMillis());
}
});
}
execute.shutdown();
System.out.println("main end .....");
}
}
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!
※想知道最厲害的網頁設計公司“嚨底家”!
※別再煩惱如何寫文案,掌握八大原則!
※產品缺大量曝光嗎?你需要的是一流包裝設計!
※聚甘新