您的位置 首页 java

Java并发编程 – Executors创建线程池的几种方式

Executors类提供五种创建线程池的方式,分别为:

newCachedThreadPool: 用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换)

newFixedThreadPool: 创建一个固定大小的线程池, 因为采用无界的阻塞队列 ,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重,但是因为使用了无界阻塞队列,如果线程执行的时间长,如果不断的往线程池提交任务的话,会导致阻塞队列无限膨胀,从而导致内存溢出)。

newScheduledThreadPool :创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor: 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。

newWorkStealingPool: 基于ForkJoinPool类来创建一个拥有多个worker线程的线程池,每个任务队列都有自己的阻塞队列。

线程池创建方式比较

newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

源代码:

 //Executors.java
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
 }  

例子1:

 package test;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
public class ThreadPoolExecutorTest {  
 public static void main(String[] args) {  
  ExecutorService cachedThreadPool = Executors.newCachedThreadPool();  
  for (int i = 0; i < 10; i++) {  
   final int index = i;  
   try {  
    Thread.sleep(index * 1000);  
   } catch (InterruptedException e) {  
    e.printStackTrace();  
   }  
   cachedThreadPool.execute(new Runnable() {  
    public void run() {  
     System.out.println(index);  
    }  
   });  
  }  
 }  
}  
  

输出结果:

 pool-1-thread-1,index:0
index: 0, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:1
index: 1, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:2
index: 2, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:3
index: 3, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:4
index: 4, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:5
index: 5, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:6
index: 6, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:7
index: 7, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:8
index: 8, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
pool-1-thread-1,index:9
index: 9, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1  

分析:线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

例子2:

 private static void newCachedThreadPoolTest2() {
        ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            final long sleep = random.nextInt(1000) + 1000;
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(sleep);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + ",index:" + index + ", sleep:" + sleep);
                }
            });
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("index: " + i +
                    ", CorePoolSize:" + cachedThreadPool.getCorePoolSize() +
                    ", MaximumPoolSize:" + cachedThreadPool.getMaximumPoolSize() +
                    ", QueueSize:" + cachedThreadPool.getQueue().size() +
                    ", PoolSize:" + cachedThreadPool.getPoolSize());
        }
    }  

输出结果:

 index: 0, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:1
index: 1, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:2
index: 2, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:3
index: 3, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:4
index: 4, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:5
index: 5, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:6
index: 6, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:7
index: 7, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:8
index: 8, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:9
index: 9, CorePoolSize:0, MaximumPoolSize:2147483647, QueueSize:0, PoolSize:10
pool-1-thread-2,index:1, sleep:1070
pool-1-thread-3,index:2, sleep:1243
pool-1-thread-6,index:5, sleep:1059
pool-1-thread-7,index:6, sleep:1035
pool-1-thread-1,index:0, sleep:1808
pool-1-thread-4,index:3, sleep:1669
pool-1-thread-10,index:9, sleep:1078
pool-1-thread-9,index:8, sleep:1207
pool-1-thread-5,index:4, sleep:1955
pool-1-thread-8,index:7, sleep:1811  

分析:线程池为无限大,线程池会直接启动尽可能多的线程来处理每个任务,因此阻塞队列不会有任何等待任务。

newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在阻塞队列中等待。

源代码:

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }  

例子:

  private static void newFixedThreadPoolTest1(){
        ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            final long sleep = random.nextInt(1000) + 1000;
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(sleep);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + ",index:" + index + ", sleep:" + sleep);
                }
            });
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("index: " + i +
                    ", CorePoolSize:" + cachedThreadPool.getCorePoolSize() +
                    ", MaximumPoolSize:" + cachedThreadPool.getMaximumPoolSize() +
                    ", QueueSize:" + cachedThreadPool.getQueue().size() +
                    ", PoolSize:" + cachedThreadPool.getPoolSize());
        }  

输出结果:

 index: 0, CorePoolSize:3, MaximumPoolSize:3, QueueSize:0, PoolSize:1
index: 1, CorePoolSize:3, MaximumPoolSize:3, QueueSize:0, PoolSize:2
index: 2, CorePoolSize:3, MaximumPoolSize:3, QueueSize:0, PoolSize:3
index: 3, CorePoolSize:3, MaximumPoolSize:3, QueueSize:1, PoolSize:3
index: 4, CorePoolSize:3, MaximumPoolSize:3, QueueSize:2, PoolSize:3
index: 5, CorePoolSize:3, MaximumPoolSize:3, QueueSize:3, PoolSize:3
index: 6, CorePoolSize:3, MaximumPoolSize:3, QueueSize:4, PoolSize:3
index: 7, CorePoolSize:3, MaximumPoolSize:3, QueueSize:5, PoolSize:3
index: 8, CorePoolSize:3, MaximumPoolSize:3, QueueSize:6, PoolSize:3
index: 9, CorePoolSize:3, MaximumPoolSize:3, QueueSize:7, PoolSize:3
pool-1-thread-1,index:0, sleep:1246
pool-1-thread-3,index:2, sleep:1207
pool-1-thread-2,index:1, sleep:1762
pool-1-thread-1,index:3, sleep:1755
pool-1-thread-3,index:4, sleep:1686
pool-1-thread-2,index:5, sleep:1523
pool-1-thread-1,index:6, sleep:1381
pool-1-thread-3,index:7, sleep:1605
pool-1-thread-2,index:8, sleep:1655
pool-1-thread-1,index:9, sleep:1216  

分析:
因为线程池大小为3,线程池会启动3个线程来处理任务,剩下的7个任务会进入到阻塞队列。

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

源代码:

 // Executors.java 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

//ScheduledThreadPoolExecutor.java
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
// ThreadPoolExecutor.java
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }  

例子1:

 private static void newScheduledThreadPoolTest() {
        ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(5);
        for(int i = 0;i<10;i++) {
            final int index = i;
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ",index:" + index + ", message: delay 1 seconds, and execute every 3 seconds");
                }
            }, 1, 3, TimeUnit.SECONDS);
            System.out.println("index: " + i +
                    ", CorePoolSize:" + scheduledThreadPool.getCorePoolSize() +
                    ", MaximumPoolSize:" + scheduledThreadPool.getMaximumPoolSize() +
                    ", QueueSize:" + scheduledThreadPool.getQueue().size() +
                    ", PoolSize:" + scheduledThreadPool.getPoolSize());
        }
    }  

输出结果:

 index: 0, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:1, PoolSize:1
index: 1, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:2, PoolSize:2
index: 2, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:3, PoolSize:3
index: 3, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:4, PoolSize:4
index: 4, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:5, PoolSize:5
index: 5, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:6, PoolSize:5
index: 6, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:7, PoolSize:5
index: 7, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:8, PoolSize:5
index: 8, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:9, PoolSize:5
index: 9, CorePoolSize:5, MaximumPoolSize:2147483647, QueueSize:10, PoolSize:5
pool-1-thread-1,index:0, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:1, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:2, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:3, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:4, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:5, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:6, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:7, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:8, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:9, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:0, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:1, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:4, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:6, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:8, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:9, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-4,index:2, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:7, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-5,index:3, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-1,index:5, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:0, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-4,index:1, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-5,index:3, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:2, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-1,index:4, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-4,index:5, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-1,index:9, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-2,index:6, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-3,index:7, message: delay 1 seconds, and execute every 3 seconds
pool-1-thread-5,index:8, message: delay 1 seconds, and execute every 3 seconds  

分析:每个任务延迟1秒后每3秒执行一次,5个线程轮流来执行10个任务。

newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。

源代码:

 // Executors.java
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

  private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        @SuppressWarnings("deprecation")
        protected void finalize() {
            super.shutdown();
        }
    }

FinalizableDelegatedExecutorService  

例子:

 private static void newSingleThreadExecutorTest() {
        ExecutorService scheduledThreadPool = (ExecutorService)Executors.newSingleThreadExecutor();
        for(int i = 0;i<10;i++) {
            final int index = i;
            scheduledThreadPool.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + ",index:" + index );
                }
            });
        }
    }  

输出结果:

 pool-1-thread-1,index:0
pool-1-thread-1,index:1
pool-1-thread-1,index:2
pool-1-thread-1,index:3
pool-1-thread-1,index:4
pool-1-thread-1,index:5
pool-1-thread-1,index:6
pool-1-thread-1,index:7
pool-1-thread-1,index:8
pool-1-thread-1,index:9  

分析:线程池只启动一个线程,然后根据任务提交的顺序依次执行任务,相当于顺序执行各个任务。

newWorkStealingPool(since 1.8)

创建一个拥有多个任务队列的线程池,如果不指定并行数量(parallelism),将会按照可用cpu数量的来定义任务队列的大小,适用于大耗时的操作,可以并行来执行多个任务。

源代码:

  public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }  

例子:

 private static void newWorkStealingPoolTest() {
        ForkJoinPool workStealingPool = (ForkJoinPool) Executors.newWorkStealingPool(3);
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            final long sleep = random.nextInt(1000) + 1000;
            workStealingPool.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(sleep);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + ",index:" + index + ", sleep:" + sleep);
                }
            });
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("index: " + i +
                    ", Parallelism:" + workStealingPool.getParallelism() +
                    ", QueuedSubmissionCount:" + workStealingPool.getQueuedSubmissionCount() +
                    ", PoolSize:" + workStealingPool.getPoolSize());
        }
        
        //ForkJoinPool会随着主线程的退出而关闭,这里通过sleep等待线程池内的线程执行完毕
        try {
            Thread.sleep(100 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }  

输出结果:

 index: 0, Parallelism:3, QueuedSubmissionCount:0, PoolSize:1
index: 1, Parallelism:3, QueuedSubmissionCount:0, PoolSize:2
index: 2, Parallelism:3, QueuedSubmissionCount:0, PoolSize:3
index: 3, Parallelism:3, QueuedSubmissionCount:1, PoolSize:3
index: 4, Parallelism:3, QueuedSubmissionCount:2, PoolSize:3
index: 5, Parallelism:3, QueuedSubmissionCount:3, PoolSize:3
index: 6, Parallelism:3, QueuedSubmissionCount:4, PoolSize:3
index: 7, Parallelism:3, QueuedSubmissionCount:5, PoolSize:3
index: 8, Parallelism:3, QueuedSubmissionCount:6, PoolSize:3
index: 9, Parallelism:3, QueuedSubmissionCount:7, PoolSize:3
ForkJoinPool-1-worker-5,index:1, sleep:1147
ForkJoinPool-1-worker-7,index:2, sleep:1556
ForkJoinPool-1-worker-3,index:0, sleep:1897
ForkJoinPool-1-worker-5,index:3, sleep:1435
ForkJoinPool-1-worker-7,index:4, sleep:1328
ForkJoinPool-1-worker-3,index:5, sleep:1992
ForkJoinPool-1-worker-5,index:6, sleep:1658
ForkJoinPool-1-worker-7,index:7, sleep:1952
ForkJoinPool-1-worker-3,index:8, sleep:1822
ForkJoinPool-1-worker-5,index:9, sleep:1854  

分析:启动三个worker线程,先执行三个任务,剩下的七个任务平均分配给这三个worker的线程的对应的阻塞队列。

文章来源:智云一二三科技

文章标题:Java并发编程 – Executors创建线程池的几种方式

文章地址:https://www.zhihuclub.com/189046.shtml

关于作者: 智云科技

热门文章

网站地图