您的位置 首页 golang

线程池解析第一章-源码解析

线程池 基本介绍

为什么要使用线程池

对于系统和服务器来说,创建和销毁一个 线程 所需要消耗的时间和资源可能比处理相关业务所消耗的时间和资源还要多还要久,不仅如此,计算机为了提高运算效率,CPU会在众多的线程之间不断的进行线程间的切换,如果线程可以随意的创建,过多的线程之间进行频繁的切换也会占用大量的内存和资源

创建线程池的两种方式
  • 通过Executors创建线程池,这是官方较为推荐的一种方式,它们均为大多数使用场景预定义了设置
  • 通过构造方法创建自定义的线程池
 ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  
线程池处理任务流程

当提交新任务到线程池当中,线程池主要做了以下几个判断

  • 检查线程池当中的线程数是否已经超过了核心线程数,如果没有,则创建新的线程对任务进行处理
  • 如果线程池当中的线程数量已经达到了设定的核心线程数的数量,则将当前任务加入到任务队列当中,任务队列中的任务由空闲线程执行
  • 如果任务队列中的任务数量已经足够了,线程池则会判断当前线程池中的线程数量是否已经超出最大线程数,如果没有超出最大线程数,则会创建一个普通线程去处理提交的这个任务

需要c/c++ Linux服务器高阶知识、电子书籍、视频资料的朋友请后台私信【架构】获取

知识点有C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等等。

线程池解析第一章-源码解析

源码解析

创建一个线程池

通过自定义的方式构造一个线程池,调用了构造线程池的构造方法,创建一个 ThreadPoolExecutor 对象,并返回ExecutorService类型

 ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  

通过上述构造方法,我们来对ThreadPoolExecutor类进行解析,它也会线程池处理线程问题的源码

ThreadPoolExecutor基本构成
   public class ThreadPoolExecutor extends AbstractExecutorService {

        //省略
        ...
    }

    public abstract class AbstractExecutorService implements ExecutorService {
        
        //省略
        ...
    }

    public interface ExecutorService extends Executor {
        //省略
        ...
    }

    public interface Executor {
        void execute(Runnable command);
    }
  

此处我将ThreadPoolExecutor类的继承关系拆开了,ThreadPoolExecutor类继承AbstractExecutorService,AbstractExecutorService是executorService接口的实现类,executorService是executor的子接口

  • executorService中的方法主要是对线程池中提交的任务进行控制管理,该类可以对任务进行提交和关闭等操作
  • AbstractExecutorService是线程池类的实现类,里面主要是executorService方法的实现
  • ThreadPoolExecutor类主要是对提交的线程进行控制,管理,执行
线程池状态源码基本变量组成及解析
   //通过ctl获取runState和workerCount状态
    private final Atomic Integer  ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static final int COUNT_BITS = Integer.SIZE - 3;//
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//最大线程数容量

    private static final int RUNNING    = -1 << COUNT_BITS;//线程池可以接受新任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//线程池不在接受新任务(当工作线程大于等于最大线程的时候)
    private static final int STOP       =  1 << COUNT_BITS;//线程池不在接受新任务,不在执行队列中的任务
    private static final int TIDYING    =  2 << COUNT_BITS;//线程池中所有任务均终止
    private static final int TERMINATED =  3 << COUNT_BITS;//线程池彻底终止


    // ctl操作
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //当需要操作workers对象的时候,需要获取锁资源
    private final ReentrantLock mainLock = new ReentrantLock();

    //工作线程集合,包含池中所有的工作线程,只能在拿着mainLock时才能访问。
    //Worker 引用存在workers集合里面
    private final HashSet<Worker> workers = new HashSet<Worker>();
  

变量ctl是非常特殊的一个变量,他是用来表示线程池状态和当前线程数量的一个变量,此处申明了final类型,说明AtomicInteger类不可以被继承,需要注意的是AtomicInteger是一个类 ctl是一个原子整数,利用高低位包装了如下两个概念

  • runState:线程池运行状态,占据高三位,主要有RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED这几个状态
  • workerCount:线程池中当前活动的线程数量,占据第29位

集合workers用来存放被创建的工作线程,包含线程池中的所有工作线程,只有在拿到mainLock的时候才能进行访问

ThreadPoolExecutor执行入口方法
 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        //判断运行的线程是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //启动新线程执行任务然后返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //当线程池状态是运行状态且成功加入工作队列中
        //当workerCountOf(c)的数量大于等于corePoolSize的时候
        // 此处需要注意,在任务数超过核心线程数的时候,线程池将任务添加进队列中由核心线程来处理,在大多数时候,线程池中的线程数维持在核心线程数之内不会额外创建线程
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();//重新获取线程池运行状态
            if (! isRunning(recheck) && remove(command))//如果运行状态不是可运行,则移除当前任务
                 reject(command);//关闭当前任务
            else if (workerCountOf(recheck) == 0)//当线程池中的线程数是0时,此时任务已经加入到workerQueue中了,此处的含义应该是加入一个空的线程,目的是消费workerQueue中的任务
                addWorker(null, false);
        }
        //如果队列也已经满了,则创建一个线程去执行任务,如果工作线程数量超过了最大线程数量,则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
  

该方法接收需要进行处理的任务,是一个实现了Runnable接口的对象,在未来的某个时候执行给出的任务,这个任务由一个新线程或是线程池中的线程进行执行, 如果这个任务不能被正确执行,不管什么原因,都提交给RejectedExecutionHandler去处理 在执行任务的时候,主要有以下几种情况

  • 当前线程数量小于核心线程数,此时将会创建新的线程去执行任务
  • 当前线程数量大于核心线程数的时候,将当前任务加入到工作队列中去,并再次检查线程池运行状态,如果当前任务数量为0则创建一个线程去处理任务
  • 如果工作队列也已经满了,则创建一个线程去执行该任务,一般这种线程在keepAliveTime到期后就会自动销毁

此处需要注意的是当任务被加入到任务队列中后,当发现可用于执行任务的线程数为0时,会构造一个不包含任务的工作线程,该线程的作用是去处理之前被放入到工作队列中的任务的 此处提出一个问题,对于线程池来说,如果线程池中的线程数没有到达核心线程数的话,则创建核心线程,如果超过了,创建的线程就不是核心线程了,那么核心线程和非核心线程的区别在哪里?对于这个问题,下面在解析getTask源码的时候会进行解释,线程池对于核心线程和非核心线程的不同处理

构建Worker并执行firstTask
 private boolean addWorker(Runnable firstTask, boolean core) {

        //第一阶段,主要作用是检查,检查线程池运行状态和活动线程数量是否符合要求,否则返回false

        retry:// continue/break跳出标记位,其中break表示要跳过这个标记的循环,continue表示从这个标记的循环继续执行
        for (;;) {
            int c = ctl.get();//获取当前线程池状态和线程数量
            int rs = runStateOf(c);//获取线程池运行状态

            // 检查当前线程池状态,如果状态大于等于SHUTDOWN(说明线程池已经停止)
            // 且线程池已关闭且任务为空且工作队列不为空,返回false
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;   //返回false

            //该循环主要是检测线程池中活动线程数量是否在合理的范围之内,否则返回false
            //如果活动线程数在规定的范围之内,则更新活动线程数并跳出循环执行work进行创建线程
            for (;;) {
                int wc = workerCountOf(c);  //获取工作线程的数量
                //如果工作线程数量大于活动线程数或者大于核心线程数
                //core为true则是核心线程数
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;   //返回false
                //比较当前值是否和c相同,如果相同则c+1并直接跳出大循环,执行work进行线程的创建
                if (compareAndIncrementWorkerCount(c))
                    break retry;//退出循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)//如果当前的线程运行状态不是rs(方法开头获取的运行状态)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //第二阶段:创建Worker对象,并启动线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //根据你给的task创建worker对象
            //Worker类实现了Runnable接口,worker对象实现run方法
            w = new Worker(firstTask);
            //线程t是用来处理worker对象中任务的线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    //rs < SHUTDOWN表示线程池处于可运行状态
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // t为新创建的线程,在workQueue运行状态下如果已经启动需要抛出异常
                            throw new IllegalThreadStateException();

                        //将创建的线程添加到线程池的集合中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;//此处的作用是启动该线程
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//为true时启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)//如果worker创建失败,则加入addWorkerFailed
                addWorkerFailed(w);
        }
        return workerStarted;
    }


  private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    {
        
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        //AQS独占锁模板方法,读者有兴趣也可以看下,此处省略
        ...
}
  

该方法是线程池处理提交任务的核心方法,线程池启动线程的地方,实现run方法,主要是分为两个阶段

  • 第一阶段,主要作用是检查,检查线程池运行状态和活动线程数量相关问题
  • 第二阶段,创建worker对象,并启动线程执行任务

第二阶段是进行工作线程的创建,创建工作线程,首先是建立一个Worker对象,由源码可知,Worker继承了AQS,实现了runnable,内部封装了待处理的任务和处理这个任务所需的线程,通过Worker内部的run方法,可以实现具体任务的逻辑,需要注意的是,Worker是继承了AQS的,且是独占锁,在Worker的run方法中调用了runWorker方法,在runWorker方法中,线程只有在获取到锁资源的情况下,才可以处理Worker对象中的被封装的任务

在构建完Worker对象后,后面的代码看着挺多,实际上就是为执行worker对象中的线程做准备

  • 首先获取锁资源,这里获取锁的原因是因为要将可执行worker对象放入工作集合中,当线程池的状态是可运行但是被构建的线程已经执行,则抛出异常,因为此时线程才被创建还没有被运行,如果运行了,则要抛出
  • 此时将当前被创建的worker对象加入到集合当中,并记录当前线程池最大线程数,释放锁,并执行线程方法
Worker中run方法的实现类
 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();//获取当前线程,即为正在执行的,创建了worker的线程
        Runnable task = w.firstTask;//获取封装进worker中的task任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //1:如果创建该worker时传递的task不为空(当前worker对应的任务还没有执行),则去执行task
            //2:如果worker中的task已经执行完了,则去检查是否还有task任务没有执行,如果有则获取workQueue的task并执行
            while (task != null || (task = getTask()) != null) {
                //Worker继承AQS,目的是想使用独占锁来表示线程是否正在执行任务
                w.lock();
                // 如果线程池停止了,确保线程被打断
                // 如果没有被打断,第二种情况要求在清除中断时去处理shutdownNow方法的竞争
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();//中断该线程
                try {
                    beforeExecute(wt, task);//这是ThreadPoolExecutor提供给子类的扩展方法
                    Throwable thrown = null;
                    try {
                        task.run();//futureTask执行的地方
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//任务变量变为null
                    w.completedTasks++;//之前线程的数量加1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //销毁当前线程
            processWorkerExit(w, completedAbruptly);
        }
    }
  

执行worker中的任务,也就是我们提交给线程池中任务的逻辑代码,此时,则完成了线程池对当前任务的处理流程 如果任务执行完成了,则继续执行workQueue中的任务,如果worker中的任务为空,则单独执行队列中的任务,此处保证了线程在还有任务没有完成的情况下,不断的从任务队列中获取任务去处理,避免创建无谓的线程,此处需要注意的是getTask()方法,通过该方法,线程不断的从任务队列中获取任务

获取workQueue中的任务
 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取当前线程池的运行状态

            // 如果线程池当前状态已经停止且队列是空时返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//重新设置当前线程池状态,内部通过cas方式实现状态的修改
                return null;//返回
            }

            int wc = workerCountOf(c);//获取工作线程的数量

            // 当前线程数是否大于核心线程数,是则返回true或者allowCoreThreadTimeOut被设置为true时,
            // 在keepAliveTime到了之后,将会销毁非核心线程或是allowCoreThreadTimeOut被设置为true的核心线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //当前线程数大于最大线程数或者(当前线程数大于核心线程数且timedOut过)且(wc > 1 || workQueue.isEmpty())
            //return将会返回null,程序销毁当前线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))//cas减少任务数量
                    return null;
                continue;//继续进行循环
            }

            try {
                //获取并从workQueue中移除该任务
                //如果false则从队列中获取任务,如果为true则表示当在时间keepAliveTime内没有获取到任务,此时移除该任务。此时任务r为null,并将timedOut设置为true,用于上面的判断条件
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)//如果获取的任务不为null则返回该对象,设置timeout为true,目的是移除当前线程
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  

获取workQueue中的任务,获取任务后直接返回任务,没有获取成功则会返回null,以下几种情况将会返回null,如果返回null且当前线程是非核心线程,则会直接销毁当且线程,如果当前线程是核心线程,如果设置allowCoreThreadTimeOut为true,核心线程也会被销毁

  • 线程池已经停止工作了
  • 队列已经空了
  • 如果当前线程为非核心线程或allowCoreThreadTimeOut,在keepAliveTime内没有获取到任务
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))//cas减少任务数量
                    return null;
                continue;//继续进行循环
            }
  

在上面的代码中,当timed为true的时候,说明当前线程只有两种身份,非核心线程或是允许被销毁的核心线程 timedOut默认为false,当在keepaliveTime的时间内没有获取到任务,将会被标注为true 判断中当线程池中线程数大于所设定的线程数或者timed为true且timedOut为true的时候满足其中之一,当工作线程数量大于1或是工作队列是空的情况下,就会返回null

回到文章一开始提出的一个问题,核心线程和非核心线程的区别 如果getTask()返回null的话,则直接销毁非核心线程,但是核心线程不会被销毁(如果核心线程做了设置也会进行销毁)

至此,线程池源码对于我们提交任务的处理的解析就全部完成了,如果文章有不对的地方,欢迎指正

文章来源:智云一二三科技

文章标题:线程池解析第一章-源码解析

文章地址:https://www.zhihuclub.com/98310.shtml

关于作者: 智云科技

热门文章

评论已关闭

31条评论

  1. com 20 E2 AD 90 20Viagra 20Reklamlar 20 20Apotik 20Jual 20Viagra 20Di 20Surabaya viagra reklamlar The wild card is a new anti euro party, the Alternative for Germany AfD, which was polling around 4 percent and so could manage to vault over the 5 threshold needed to win seats in parliament on election night In the next step, we examined the effect of ALA and DHLA on enzymatic activity of protein tyrosine phosphatases PTP1B and SHP2

  2. levitra voltaren fitil fiyat 2020 The decision to release the men has stirred anguish in Israel, particularly among the relatives of those killed in attacks

  3. To investigate the molecular basis of acquired metal resistance, we previously isolated arsenite resistant KAS cells

  4. Severe symptoms include infection in your jaw bone HER2 was initially a prognostic biomarker, with HER2 positive malignancy prognostic of a worse outcome than HER2 negative malignancy 14

  5. This is in contrast to the often reported depression and malaise that male users suffer with the use of clomiphine citrate The new compounds contain imines with pKa values that result in a substantial amount of the imine being protonated under physiological conditions

  6. A randomized double blind placebo controlled trial of a Chinese herbal medicine preparation Jiawei Qing e Fang for hot flashes and quality of life in perimenopausal women

  7. Extrapyramidal adverse events associated with atypical antipsychotic treatment of bipolar disorder Interventions are not well developed and researchers are currently evaluating computerbased activities, group sessions and use of ginko biloba to improve cognitive functioning Vardy Dhillon

  8. In the 4 week treatment group, 35 patients 76 That s why is recommended to add Aromatase Inhibitors AIs that are also anti estrogens but they are more powerful, such as Arimidex, Aromasin or Femara

  9. Diabetic status was not related to the use of calcium channel blockers as monotherapy, but use of calcium channel blockers in combination with other drugs not shown was higher in diabetics than in nondiabetics 45

  10. Adipose OGT suppresses lipolysis and promotes obesity Therefore, we conclude that the abnormalities which we have observed are due to the long term estrogenic effects of these drugs

  11. Human studies are just now beginning to determine if the results seen in animals will also be found in people Other less common side effects include fatigue, headache, and nausea

  12. Earlier this week, Reutersreported that Etisalat had hired Goldman Sachs Inc toadvise on a potential bid for Warid

  13. Subjects homozygous for this mutation MTHFR 677TT genotype exhibit reduced MTHFR activity and increased risk for a wide variety of cancers 41 43, but the evidence of an association between this polymorphism and cancer is inconsistent, with some reports suggesting a reduction in colorectal cancer risk with the T allele 44 I petted him and told him he was okay, and after a few hours he was his old self again

  14. These visual disturbances are usually reversible; however, cases of prolonged visual disturbance have been reported with some occurring after clomiphene discontinuation

  15. They include Tamoxifen Nolvadex 0 mg kg, or vehicle 1 aqueous solution wt vol of carboxymethyl cellulose by oral gavage once daily for 28 days

  16. When I mention this to both my GYN and ONC they seem to think that it s normal to be this irregular He helped with Memorial Day services and was active in mowing and maintaining the Belvidere Cemetery

  17. rocaltrol panadol hot 500mg english In a list of forecasts, the IGD said 3 out of every extra 4 we spend as a nation on food over the coming five years will go either on the discounters, the internet or convenience stores

  18. Clinical and genomic characterization of treatment emergent small cell neuroendocrine prostate cancer a multi institutional prospective study As the outcomes of AHF patients has been linked with the triggering factors, we make the hypothesis that early i

网站地图