您的位置 首页 java

深入理解Java线程池,这一篇就够了

【死记硬背】

1 理解

线程 是稀缺资源,它的创建和销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行操作系统状态切换,为避免资源过度消耗需要设法重用线程执行多个任务。 线程池 就是一个线程缓存,负责对线程进行统一分配、调度和监控。

2 线程池的作用

* 限定线程的个数,不会导致由于线程过多导致系统运行缓慢或崩溃。

* 线程池不需要每次都去创建和销毁,节约了资源,并且响应速度快。

3 什么时候用线程池

* 单个任务处理的时间比较短。

* 需要处理的任务数量很大。

4 如何设置线程池的大小

* CPU 密集型任务:

CPU个数+1的线程数

* IO密集型任务:

两倍CPU个数+1的线程数

5 线程池的组成

一般的线程池主要由4部分组成:

5.1 线程池管理器:用于创建并管理线程池。

5.2 工作线程:线程池中的线程。

5.3 任务接口:每个任务必须实现的接口,用于工作线程调度其运行。

5.4 任务队列:用于存放待处理的任务,提供一种缓冲机制。

6 线程池的实现方式

线程池的主要实现方式有五种:newCached thread Pool、newFixedThreadPool、newScheduledThreadPool、newSingleThreadExecutor和newWorkStealingPool。是通过Executor框架实现的,主要用到了Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future和FutureTask这几个类。

7 线程池涉及到的主要参数

7.1 corePoolSize:指定了线程池中的线程数量。

7.2 maximumPoolSize:指定了线程池中最大线程数量。

7.3 keepAliveTime:当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多少时间内会被销毁。

7.4 unit:keepAliveTime的 时间单位

7.5 workQueue:任务队列,被提交但尚未被执行的任务。

7.6 threadFactory:线程工厂,用于创建线程,一般用默认的即可。

7.7 handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。

8 拒绝策略

JDK 内置的拒绝策略如下:

8.1 AbortPolicy:直接抛出异常,阻止系统正常运行。

8.2 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

8.3 DiscardOldestPolicy:丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

8.4 DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。

以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略扔无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口。

9 Java线程池的工作过程

9.1 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。

9.2 当调用 execute() 方法添加一个任务时,线程池会做如下判断:

a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;

b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;

c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池

会抛出异常 RejectExecutionException。

9.3 当一个线程完成任务时,它会从队列中取下一个任务来执行。

9.4 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

【答案解析】

五种常用线程池的实现方式:

newCachedThreadPool

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 它是一个可以无限扩大的线程池;
 * 它比较适合处理执行时间比较小的任务;
 * corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
 * keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
 * 采用 Synchronous Queue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,
 * 就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
 */
public class CachedThreadPoolTest {
    public  static   void  main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCached Thread Pool();
        for (int i=0;i<1000;i++){
            /*try{
                Thread.sleep(2000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }*/
            cachedThreadPool.execute(new Runnable() {
                @ Override 
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"正在被执行");
                }
            });
            System.out.println("执行完毕");
        }

    }
}  

newFixedThreadPool

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 它是一种固定大小的线程池;
 * corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
 * keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;
 * 但这里keepAliveTime无效;
 * 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
 * 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
 * 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效
 */
public class FixedThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i=0;i<10;i++){
            final int index = i;
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        System.out.println(Thread.currentThread().getName()+"正在执行"+index);
                        Thread.sleep(2000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}  

newScheduledThreadPool

 import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
 * 1 scheduledAtFixedRate
 * 2 scheduledWithFixedDelay
 * SchduledFutureTask接收的参数:
 * time:任务开始的时间
 * sequenceNumber:任务的序号
 * period:任务执行的时间间隔
 * 它采用DelayQueue存储等待的任务
 * DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
 * DelayQueue也是一个无界队列;
 * 工作线程的执行过程:
 * 工作线程会从DelayQueue取已经到期的任务去执行;
 * 执行结束后重新设置任务的到期时间,再次放回DelayQueue
 */
public class ScheduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        /**/
        scheduledThreadPool.schedule(new Runnable(){
            public void run(){
                System.out.println("延迟1秒执行");
            }
        }, 1, TimeUnit.SECONDS);
        scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                System.out.println("执行");
            }
        },1,1,TimeUnit.SECONDS);
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("执行3");
            }
        },1,3,TimeUnit.SECONDS);
    }
}  

newSingleThreadExecutor

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 它只会创建一条工作线程处理任务;
 * 采用的阻塞队列为LinkedBlockingQueue;
 */
public class SingleThreadExecutorTest {
    public static void main(String[] args) {
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "正在被执行,打印的值是:" + index);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}  

newWorkStealingPool

 import java.util.concurrent.*;
public class WorkStealingPoolTest {
    // 线程数
    private static final int threads = 10;
    // 用于计数线程是否执行完成
    static CountDownLatch countDownLatch = new CountDownLatch(threads);

    public static void main(String[] args) throws Exception{
        test3();
    }

    public static void test1() throws Exception{
        System.out.println("---- start ----");
        ExecutorService executorService = Executors.newWorkStealingPool();
        for (int i = 0; i < threads; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName());
                } catch (Exception e) {
                    System.out.println(e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        System.out.println("---- end ----");
    }

    public static void test2() throws InterruptedException{
        System.out.println("---- start ----");
        ExecutorService executorService = Executors.newWorkStealingPool();
        for (int i = 0; i < threads; i++) {
//            Callable 带返回值
            executorService.submit(new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName());
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            }));
        }
        countDownLatch.await();
        System.out.println("---- end ----");
    }

    public static void test3() throws Exception{
        System.out.println("---- start ----");
        ExecutorService executorService = Executors.newWorkStealingPool();
        for (int i = 0; i < threads; i++) {
//          Runnable 带返回值
            FutureTask<?> futureTask = new FutureTask<>(new Callable<String>() {
                /**
                 * call
                 * @return currentThreadName
                 */
                @Override
                public String call() {
                    return Thread.currentThread().getName();
                }
            });
            executorService.submit(new Thread(futureTask));
            System.out.println(futureTask.get());
        }
        System.out.println("---- end ----");
    }
}  

【温馨提示】

点赞+收藏文章,关注我并私信回复【面试题解析】,即可100%免费领取楼主的所有面试题资料!

文章来源:智云一二三科技

文章标题:深入理解Java线程池,这一篇就够了

文章地址:https://www.zhihuclub.com/189249.shtml

关于作者: 智云科技

热门文章

网站地图