ThreadPoolExecutor線(xiàn)程池參數設置技巧
一、ThreadPoolExecutor的重要參數
corePoolSize:核心線(xiàn)程數
核心線(xiàn)程會(huì )一直存活,及時(shí)沒(méi)有任務(wù)需要執行
當線(xiàn)程數小于核心線(xiàn)程數時(shí),即使有線(xiàn)程空閑,線(xiàn)程池也會(huì )優(yōu)先創(chuàng )建新線(xiàn)程處理
設置allowCoreThreadTimeout=true(默認false)時(shí),核心線(xiàn)程會(huì )超時(shí)關(guān)閉
queueCapacity:任務(wù)隊列容量(阻塞隊列)
當核心線(xiàn)程數達到最大時(shí),新任務(wù)會(huì )放在隊列中排隊等待執行
maxPoolSize:最大線(xiàn)程數
當線(xiàn)程數>=corePoolSize,且任務(wù)隊列已滿(mǎn)時(shí)。線(xiàn)程池會(huì )創(chuàng )建新線(xiàn)程來(lái)處理任務(wù)
當線(xiàn)程數=maxPoolSize,且任務(wù)隊列已滿(mǎn)時(shí),線(xiàn)程池會(huì )拒絕處理任務(wù)而拋出異常
keepAliveTime:線(xiàn)程空閑時(shí)間
當線(xiàn)程空閑時(shí)間達到keepAliveTime時(shí),線(xiàn)程會(huì )退出,直到線(xiàn)程數量=corePoolSize
如果allowCoreThreadTimeout=true,則會(huì )直到線(xiàn)程數量=0
allowCoreThreadTimeout:允許核心線(xiàn)程超時(shí)
rejectedExecutionHandler:任務(wù)拒絕處理器
兩種情況會(huì )拒絕處理任務(wù):
當線(xiàn)程數已經(jīng)達到maxPoolSize,切隊列已滿(mǎn),會(huì )拒絕新任務(wù)
當線(xiàn)程池被調用shutdown()后,會(huì )等待線(xiàn)程池里的任務(wù)執行完畢,再shutdown。如果在調用shutdown()和線(xiàn)程池真正shutdown之間提交任務(wù),會(huì )拒絕新任務(wù)
線(xiàn)程池會(huì )調用rejectedExecutionHandler來(lái)處理這個(gè)任務(wù)。如果沒(méi)有設置默認是AbortPolicy,會(huì )拋出異常
ThreadPoolExecutor類(lèi)有幾個(gè)內部實(shí)現類(lèi)來(lái)處理這類(lèi)情況:
AbortPolicy 丟棄任務(wù),拋運行時(shí)異常
CallerRunsPolicy 執行任務(wù)
DiscardPolicy 忽視,什么都不會(huì )發(fā)生
DiscardOldestPolicy 從隊列中踢出最先進(jìn)入隊列(最后一個(gè)執行)的任務(wù)
實(shí)現RejectedExecutionHandler接口,可自定義處理器
二、ThreadPoolExecutor執行順序:
線(xiàn)程池按以下行為執行任務(wù)
當線(xiàn)程數小于核心線(xiàn)程數時(shí),創(chuàng )建線(xiàn)程。
當線(xiàn)程數大于等于核心線(xiàn)程數,且任務(wù)隊列未滿(mǎn)時(shí),將任務(wù)放入任務(wù)隊列。
當線(xiàn)程數大于等于核心線(xiàn)程數,且任務(wù)隊列已滿(mǎn)
若線(xiàn)程數小于最大線(xiàn)程數,創(chuàng )建線(xiàn)程
若線(xiàn)程數等于最大線(xiàn)程數,拋出異常,拒絕任務(wù)
三、如何設置參數
默認值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
如何來(lái)設置
需要根據幾個(gè)值來(lái)決定
tasks :每秒的任務(wù)數,假設為500~1000
taskcost:每個(gè)任務(wù)花費時(shí)間,假設為0.1s
responsetime:系統允許容忍的最大響應時(shí)間,假設為1s
做幾個(gè)計算
corePoolSize = 每秒需要多少個(gè)線(xiàn)程處理?
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個(gè)線(xiàn)程。corePoolSize設置應該大于50
根據8020原則,如果80%的每秒任務(wù)數小于800,那么corePoolSize設置為80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列里的線(xiàn)程可以等待1s,超過(guò)了的需要新開(kāi)線(xiàn)程來(lái)執行
切記不能設置為Integer.MAX_VALUE,這樣隊列會(huì )很大,線(xiàn)程數只會(huì )保持在corePoolSize大小,當任務(wù)陡增時(shí),不能新開(kāi)線(xiàn)程來(lái)執行,響應時(shí)間會(huì )隨之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
計算可得 maxPoolSize = (1000-80)/10 = 92
(最大任務(wù)數-隊列容量)/每個(gè)線(xiàn)程每秒處理能力 = 最大線(xiàn)程數
rejectedExecutionHandler:根據具體情況來(lái)決定,任務(wù)不重要可丟棄,任務(wù)重要則要利用一些緩沖機制來(lái)處理
keepAliveTime和allowCoreThreadTimeout采用默認通常能滿(mǎn)足
以上都是理想值,實(shí)際情況下要根據機器性能來(lái)決定。如果在未達到最大線(xiàn)程數的情況機器cpu load已經(jīng)滿(mǎn)了,則需要通過(guò)升級硬件(呵呵)和優(yōu)化代碼,降低taskcost來(lái)處理。
案例:
maven依賴(lài)
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
1、定義線(xiàn)程池異步任務(wù)配置類(lèi):com.zhixi.config
application.properties
# 異步線(xiàn)程配置
# 配置核心線(xiàn)程數
async.executor.thread.core_pool_size=5
# 配置最大線(xiàn)程數
async.executor.thread.max_pool_size=5
# 配置隊列大小
async.executor.thread.queue_capacity=99999
# 配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴
async.executor.thread.name.prefix=async-service-
線(xiàn)程池配置類(lèi):ThreadPoolTaskConfig
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @ClassName ThreadPoolTaskConfig
* @Description 線(xiàn)程池配置類(lèi)
*/
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolTaskConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線(xiàn)程數
executor.setCorePoolSize(corePoolSize);
//配置最大線(xiàn)程數
executor.setMaxPoolSize(maxPoolSize);
//配置隊列大小
executor.setQueueCapacity(queueCapacity);
//配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當pool已經(jīng)達到max size的時(shí)候,如何處理新任務(wù)
// 拒絕策略:CALLER_RUNS:不在新線(xiàn)程中執行任務(wù),而是有調用者所在的線(xiàn)程來(lái)執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執行初始化
executor.initialize();
return executor;
}
}
2、定義業(yè)務(wù)接口:com.zhixi.service
發(fā)送短信:AsyncEmailService
/**
* @ClassName AsyncService
* @Description 發(fā)送短信業(yè)務(wù)
*/
public interface AsyncEmailService {
/**
* 發(fā)送短信
*/
void executeAsync();
}
發(fā)送快遞:syncCommodityService
/**
* @ClassName AsyncCommodityService
* @Description 發(fā)送快遞的任務(wù)
*/
public interface AsyncCommodityService {
/**
* 發(fā)送快遞
*/
void expressDelivery();
}
3、業(yè)務(wù)接口實(shí)現類(lèi):com.zhixi.service.impl
短信接口實(shí)現類(lèi):AsyncEmailServiceImpl
import com.zhixi.service.AsyncEmailService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class AsyncEmailServiceImpl implements AsyncEmailService {
private static final Logger logger = LoggerFactory.getLogger(AsyncEmailServiceImpl.class);
@Override
@Async("taskExecutor")
public void executeAsync() {
logger.info("發(fā)送短信事件開(kāi)始執行~");
logger.info("發(fā)送短信中……");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("發(fā)送短信事件執行完畢");
}
}
發(fā)送快遞接口實(shí)現類(lèi):AsyncCommodityServiceImpl
import com.zhixi.service.AsyncCommodityService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class AsyncCommodityServiceImpl implements AsyncCommodityService {
private static final Logger logger = LoggerFactory.getLogger(AsyncCommodityServiceImpl.class);
@Async("taskExecutor")
@Override
public void expressDelivery() {
logger.info("發(fā)送快遞事件開(kāi)始執行~");
logger.info("發(fā)送快遞中……");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("發(fā)送快遞事件執行完畢");
}
}
4、視圖訪(fǎng)問(wèn)層:com.zhixi.controller
AsyncController
import com.zhixi.service.AsyncCommodityService;
import com.zhixi.service.AsyncEmailService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncController {
@Autowired
private AsyncEmailService emailService;
@Autowired
private AsyncCommodityService commodityService;
@RequestMapping(value = "/async")
public void async() {
/*寄快遞業(yè)務(wù)方法*/
commodityService.expressDelivery();
/*發(fā)送短信業(yè)務(wù)方法*/
emailService.executeAsync();
}
}
還可以看看
其他文章,謝謝您的閱讀。
網(wǎng)站申明:系本文編輯轉載,來(lái)源于網(wǎng)絡(luò ),目的在于傳遞更多信息,并不代表本網(wǎng)贊同其觀(guān)點(diǎn)和對其真實(shí)性負責,所有權歸屬原作者。如內容、圖片有任何版權問(wèn)題,請
聯(lián)系我們刪除。