Executors类提供五种创建线程池的方式,分别为:
newCachedThreadPool: 用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换)
newFixedThreadPool: 创建一个固定大小的线程池, 因为采用无界的阻塞队列 ,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重,但是因为使用了无界阻塞队列,如果线程执行的时间长,如果不断的往线程池提交任务的话,会导致阻塞队列无限膨胀,从而导致内存溢出)。
newScheduledThreadPool :创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor: 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。
newWorkStealingPool: 基于ForkJoinPool类来创建一个拥有多个worker线程的线程池,每个任务队列都有自己的阻塞队列。
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
源代码:
//Executors.java
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
例子1:
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}
输出结果:
pool-1-thread-1,index:0
index: 0, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:1
index: 1, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:2
index: 2, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:3
index: 3, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:4
index: 4, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:5
index: 5, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:6
index: 6, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:7
index: 7, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:8
index: 8, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:9
index: 9, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
分析:线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
例子2:
private static void newCachedThreadPoolTest2() {
ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
Random random = new Random();
for (int i = 0; i < 10; i++) {
final int index = i;
final long sleep = random.nextInt(1000) + 1000;
cachedThreadPool.execute(new Runnable() {
public void run() {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ",index:" + index + ", sleep:" + sleep);
}
});
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("index: " + i +
", CorePoolSize:" + cachedThreadPool.getCorePoolSize() +
", MaximumPoolSize:" + cachedThreadPool.getMaximumPoolSize() +
", QueueSize:" + cachedThreadPool.getQueue().size() +
", PoolSize:" + cachedThreadPool.getPoolSize());
}
}
输出结果:
index: 0, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
index: 1, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:2
index: 2, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:3
index: 3, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:4
index: 4, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:5
index: 5, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:6
index: 6, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:7
index: 7, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:8
index: 8, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:9
index: 9, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:10
pool-1-thread-2,index:1, sleep:1070
pool-1-thread-3,index:2, sleep:1243
pool-1-thread-6,index:5, sleep:1059
pool-1-thread-7,index:6, sleep:1035
pool-1-thread-1,index:0, sleep:1808
pool-1-thread-4,index:3, sleep:1669
pool-1-thread-10,index:9, sleep:1078
pool-1-thread-9,index:8, sleep:1207
pool-1-thread-5,index:4, sleep:1955
pool-1-thread-8,index:7, sleep:1811
分析:线程池为无限大,线程池会直接启动尽可能多的线程来处理每个任务,因此阻塞队列不会有任何等待任务。
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在阻塞队列中等待。
源代码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
例子:
private static void newFixedThreadPoolTest1(){
ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
Random random = new Random();
for (int i = 0; i < 10; i++) {
final int index = i;
final long sleep = random.nextInt(1000) + 1000;
cachedThreadPool.execute(new Runnable() {
public void run() {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ",index:" + index + ", sleep:" + sleep);
}
});
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("index: " + i +
", CorePoolSize:" + cachedThreadPool.getCorePoolSize() +
", MaximumPoolSize:" + cachedThreadPool.getMaximumPoolSize() +
", QueueSize:" + cachedThreadPool.getQueue().size() +
", PoolSize:" + cachedThreadPool.getPoolSize());
}
输出结果:
index: 0, CorePoolSize:3, MaximumPoolSize:3, QueueSize:0, PoolSize:1
index: 1, CorePoolSize:3, MaximumPoolSize:3, QueueSize:0, PoolSize:2
index: 2, CorePoolSize:3, MaximumPoolSize:3, QueueSize:0, PoolSize:3
index: 3, CorePoolSize:3, MaximumPoolSize:3, QueueSize:1, PoolSize:3
index: 4, CorePoolSize:3, MaximumPoolSize:3, QueueSize:2, PoolSize:3
index: 5, CorePoolSize:3, MaximumPoolSize:3, QueueSize:3, PoolSize:3
index: 6, CorePoolSize:3, MaximumPoolSize:3, QueueSize:4, PoolSize:3
index: 7, CorePoolSize:3, MaximumPoolSize:3, QueueSize:5, PoolSize:3
index: 8, CorePoolSize:3, MaximumPoolSize:3, QueueSize:6, PoolSize:3
index: 9, CorePoolSize:3, MaximumPoolSize:3, QueueSize:7, PoolSize:3
pool-1-thread-1,index:0, sleep:1246
pool-1-thread-3,index:2, sleep:1207
pool-1-thread-2,index:1, sleep:1762
pool-1-thread-1,index:3, sleep:1755
pool-1-thread-3,index:4, sleep:1686
pool-1-thread-2,index:5, sleep:1523
pool-1-thread-1,index:6, sleep:1381
pool-1-thread-3,index:7, sleep:1605
pool-1-thread-2,index:8, sleep:1655
pool-1-thread-1,index:9, sleep:1216
分析:
因为线程池大小为3,线程池会启动3个线程来处理任务,剩下的7个任务会进入到阻塞队列。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
源代码:
// Executors.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor.java
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
// ThreadPoolExecutor.java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
例子1:
private static void newScheduledThreadPoolTest() {
ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(5);
for(int i = 0;i<10;i++) {
final int index = i;
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + ",index:" + index + ", message: delay 1 seconds, and execute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
System.out.println("index: " + i +
", CorePoolSize:" + scheduledThreadPool.getCorePoolSize() +
", MaximumPoolSize:" + scheduledThreadPool.getMaximumPoolSize() +
", QueueSize:" + scheduledThreadPool.getQueue().size() +
", PoolSize:" + scheduledThreadPool.getPoolSize());
}
}
输出结果:
index: 0, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:1, PoolSize:1
index: 1, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:2, PoolSize:2
index: 2, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:3, PoolSize:3
index: 3, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:4, PoolSize:4
index: 4, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:5, PoolSize:5
index: 5, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:6, PoolSize:5
index: 6, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:7, PoolSize:5
index: 7, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:8, PoolSize:5
index: 8, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:9, PoolSize:5
index: 9, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:10, PoolSize:5
pool-1-thread-1,index:0, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:1, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:2, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:3, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:4, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:5, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:6, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:7, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:8, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:9, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:0, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:1, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:4, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:6, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:8, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:9, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-4,index:2, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:7, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-5,index:3, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-1,index:5, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:0, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-4,index:1, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-5,index:3, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:2, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-1,index:4, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-4,index:5, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-1,index:9, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:6, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:7, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-5,index:8, message: delay 1 seconds, and execute every 3 seconds
分析:每个任务延迟1秒后每3秒执行一次,5个线程轮流来执行10个任务。
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。
源代码:
// Executors.java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
private static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
@SuppressWarnings("deprecation")
protected void finalize() {
super.shutdown();
}
}
FinalizableDelegatedExecutorService
例子:
private static void newSingleThreadExecutorTest() {
ExecutorService scheduledThreadPool = (ExecutorService)Executors.newSingleThreadExecutor();
for(int i = 0;i<10;i++) {
final int index = i;
scheduledThreadPool.execute(new Runnable() {
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ",index:" + index );
}
});
}
}
输出结果:
pool-1-thread-1,index:0
pool-1-thread-1,index:1
pool-1-thread-1,index:2
pool-1-thread-1,index:3
pool-1-thread-1,index:4
pool-1-thread-1,index:5
pool-1-thread-1,index:6
pool-1-thread-1,index:7
pool-1-thread-1,index:8
pool-1-thread-1,index:9
分析:线程池只启动一个线程,然后根据任务提交的顺序依次执行任务,相当于顺序执行各个任务。
newWorkStealingPool(since 1.8)
创建一个拥有多个任务队列的线程池,如果不指定并行数量(parallelism),将会按照可用cpu数量的来定义任务队列的大小,适用于大耗时的操作,可以并行来执行多个任务。
源代码:
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
例子:
private static void newWorkStealingPoolTest() {
ForkJoinPool workStealingPool = (ForkJoinPool) Executors.newWorkStealingPool(3);
Random random = new Random();
for (int i = 0; i < 10; i++) {
final int index = i;
final long sleep = random.nextInt(1000) + 1000;
workStealingPool.execute(new Runnable() {
public void run() {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ",index:" + index + ", sleep:" + sleep);
}
});
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("index: " + i +
", Parallelism:" + workStealingPool.getParallelism() +
", QueuedSubmissionCount:" + workStealingPool.getQueuedSubmissionCount() +
", PoolSize:" + workStealingPool.getPoolSize());
}
//ForkJoinPool会随着主线程的退出而关闭,这里通过sleep等待线程池内的线程执行完毕
try {
Thread.sleep(100 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出结果:
index: 0, Parallelism:3, QueuedSubmissionCount:0, PoolSize:1
index: 1, Parallelism:3, QueuedSubmissionCount:0, PoolSize:2
index: 2, Parallelism:3, QueuedSubmissionCount:0, PoolSize:3
index: 3, Parallelism:3, QueuedSubmissionCount:1, PoolSize:3
index: 4, Parallelism:3, QueuedSubmissionCount:2, PoolSize:3
index: 5, Parallelism:3, QueuedSubmissionCount:3, PoolSize:3
index: 6, Parallelism:3, QueuedSubmissionCount:4, PoolSize:3
index: 7, Parallelism:3, QueuedSubmissionCount:5, PoolSize:3
index: 8, Parallelism:3, QueuedSubmissionCount:6, PoolSize:3
index: 9, Parallelism:3, QueuedSubmissionCount:7, PoolSize:3
ForkJoinPool-1-worker-5,index:1, sleep:1147
ForkJoinPool-1-worker-7,index:2, sleep:1556
ForkJoinPool-1-worker-3,index:0, sleep:1897
ForkJoinPool-1-worker-5,index:3, sleep:1435
ForkJoinPool-1-worker-7,index:4, sleep:1328
ForkJoinPool-1-worker-3,index:5, sleep:1992
ForkJoinPool-1-worker-5,index:6, sleep:1658
ForkJoinPool-1-worker-7,index:7, sleep:1952
ForkJoinPool-1-worker-3,index:8, sleep:1822
ForkJoinPool-1-worker-5,index:9, sleep:1854
分析:启动三个worker线程,先执行三个任务,剩下的七个任务平均分配给这三个worker的线程的对应的阻塞队列。