您的位置 首页 java

Java 并发编程,看这一篇就够了

1. 线程基基础

进程是程序运行资源分配的最小单位

进程是操作系统进行资源分配的最小单位,其中资源包括:CPU、 内存空间、磁盘 IO 等,同一进程中的多条 线程 共享该进程中的全部系统资源,而进程和进程之间是相互独立的。进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。

线程是 CPU 调度的最小单位,必须依赖于进程而存在

线程是进程的一个实体,是 CPU 调度和分派的基本单位,它是比进程更小的、能独立运行的基本单位。 线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

CPU时间片轮转机制

时间片轮转调度是一种最古老、 最简单、 最公平且使用最广的算法,又称 RR调度。 每个进程被分配一个时间段,称作它的时间片,即该进程允许运行的时间。百度百科对 CPU 时间片轮转机制原理解释如下:如果在时间片结束时进程还在运行,则 CPU 将被剥夺并分配给另一个进程。如果进程在时间片结束前阻塞或结束,则 CPU 当即进行切换。 调度程序所要做的就是维护一张就绪进程列表,当进程用完它的时间片后,它被移到队列的末尾,时间片轮转调度中唯一有趣的一点是时间片的长度。从一个进程切换到另一个进程是需要定时间的,包括保存和装入寄存器值及内存映像,更新各种表格和队列等。

CPU 核心数和 线程数 的关系

多核心:也指单芯片多处理器( Chip Multiprocessors,简称 CMP),CMP 是由美国斯坦福大学提出的,其思想是将大规模并行处理器中的 SMP(对称多处理器)集成到同一芯片内,各个处理器并行执行不同的进程。 这种依靠多个 CPU 同时并行地运行程序是实现超高速计算的一个重要方向,称为并行处理 多线程 : Simultaneous Multithreading.简称 SMT.让同一个处理器上的多个线程同步执行并共享处理器的执行资源。核心数、 线程数:目前主流 CPU 都是多核的。 增加核心数目就是为了增加线程数,因为操作系统是通过线程来执行任务的,一般情况下它们是 1:1 对应关系,也就是说四核 CPU 一般拥有四个线程。但 Intel 引入超线程技术后,使核心数与线程数形成 1:2 的关系。

高并发编程的意义、好处和注意事项

(1)充分利用 CPU 的资源

(2)加快响应用户的时间

(3)可以使你的代码模块化,异步化,简单化

多线程程序需要注意事项

(1)线程之间的安全性

(2)线程之间的死锁

(3)线程太多了会将服务器资源耗尽形成死机当机

Java 程序天生就是多线程的

一个 Java 程序从 main()方法开始执行, 然后按照既定的代码逻辑执行, 看似没有其他线程参与, 但实际上 Java 程序天生就是多线程程序, 因为执行 main()方法的是一个名称为 main 的线程。

Attach Listener

Attach Listener线程是负责接收到外部的命令,而对该命令进行执行的并且吧结果返回给发送者。通常我们会用一些命令去要求 JVM 给我们一些反 馈信息,如:java -version、jmap、jstack等等。如果该线程在jvm启动的时候没有初始化,那么,则会在用户第一次执行jvm命令时,得到启动。

Signal Dispatcher

前面我们提到第一个Attach Listener线程的职责是接收外部jvm命令,当命令接收成功后,会交给signal dispather线程去进行分发到各个不同的模块处理命令,并且返回处理结果。signal dispather线程也是在第一次接收外部jvm命令时,进行初始化工作。

finalize r

这个线程也是在main线程之后创建的,其优先级为10,主要用于在垃圾收集前,调用对象的finalize()方法;关于Finalizer线程的几点:

1. 只有当开始一轮垃圾收集时,才会开始调用finalize()方法;因此并不是所有对象的finalize()方法都会被执行;

2. 该线程也是daemon线程,因此如果虚拟机中没有其他非daemon线程,不管该线程有没有执行完finalize()方法,JVM也会退出;

3. JVM在垃圾收集时会将失去引用的对象包装成Finalizer对象(Reference的实现),并放入ReferenceQueue,由Finalizer线程来处理;最后将该Finalizer对象的引用置为null,由垃圾收集器来回收;

4. JVM为什么要单独用一个线程来执行finalize()方法呢?如果JVM的垃圾收集线程自己来做,很有可能由于在finalize()方法中误操作导致GC线程停止或不可控,这对GC线程来说是一种灾难;

Reference Handler

VM在创建main线程后就创建Reference Handler线程,其优先级最高,为10,它主要用于处理引用对象本身(软引用、弱引用、虚引用)的垃圾回收问题。

Monitor Ctrl-Break

IntelliJ IDEA执行用户代码的时候,实际是通过反射方式去调用,而与此同时会创建一个Monitor Ctrl ;

eclipse 则不会创建。

线程启动

启动线程的方式有:

1、 X extends Thread;, 然后 X.start

2、 X implements Runnable; 然后交给 Thread 运行

线程停止

安全的中止则是其他线程通过调用某个线程 A 的 interrupt ()方法对其进行中断操作, 中断好比其他线程对该线程打了个招呼, “A, 你要中断了” , 不代表线程 A 会立即停止自己的工作, 同样的 A 线程完全可以不理会这种中断请求。线程通过检查自身的中断标志位是否被置为 true 来进行响应,线程通过方法 isInterrupted()来进行判断是否被中断, 也可以调用静态方法Thread.interrupted()来进行判断当前线程是否被中断, 不过 Thread.interrupted()会同时将中断标识位改写为 false。

interrupt() 、 isInterrupted()、interrupted()

interrupt() :通知线程中断,将中断标识更改为true

isInterrupted() :判断是否中断,线程中断返回true,未中断返回false , 调用之后中断标识依然是true,即不改变中断标识。

interrupted():静态方法中断,线程中断返回true,未中断返回false , 调用之后中断标识复位,即为false

join 方法

把指定的线程加入到当前线程, 可以将两个交替执行的线程合并为顺序执行。比如在线程 B 中调用了线程 A 的 Join()方法, 直到线程 A 执行完毕后, 才会继续行线程 B。

线程的优先级

在 Java 线程中, 通过一个整型成员变量 priority 来控制优先级, 优先级的范围从 1~10, 在线程构建的时候可以通过 setPriority(int)方法来修改优先级, 默认优先级是 5, 优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级时, 针对频繁阻塞(休眠或者 I/O 操作) 的线程需要设置较高优先级, 而偏重计算(需要较多 CPU 时间或者偏运算) 的线程则设置较低的优先级, 确保处理器不会被独占。 在不同的 JVM 以及操作系统上, 线程规划会存在差异, 有些操作系统甚至会忽略对线程优先级的设定。

线程的调度

线程调度是指系统为线程分配 CPU 使用权的过程, 主要调度方式有两种:

协同式线程调度(Cooperative Threads-Scheduling)

抢占式线程调度(Preemptive Threads-Scheduling)

使用协同式线程调度的多线程系统, 线程执行的时间由线程本身来控制, 线程把自己的工作执行完之后, 要主动通知系统切换到另外一个线程上。 使用协同式线程调度的最大好处是实现简单, 由于线程要把自己的事情做完后才会通知系统进行线程切换, 所以没有线程同步的问题, 但是坏处也很明显, 如果一个线程出了问题, 则程序就会一直阻塞。

使用抢占式线程调度的多线程系统, 每个线程执行的时间以及是否切换都由系统决定。 在这种情况下, 线程的执行时间不可控, 所以不会有「一个线程导致整个进程阻塞」 的问题出现。

守护线程

Daemon(守护) 线程是一种支持型线程, 因为它主要被用作程序中后台调度以及支持性工作。 这意味着, 当一个 Java 虚拟机中不存在非 Daemon 线程的时候, Java 虚拟机将会退出。 可以通过调用 Thread.setDaemon(true)将线程设置为 Daemon 线程。 我们一般用不上, 比如垃圾回收线程就是 Daemon 线程。Daemon 线程被用作完成支持性工作, 但是在 Java 虚拟机退出时 Daemon 线程中的 finally 块并不一定会执行。 在构建 Daemon 线程时, 不能依靠 finally 块中的内容来确保执行关闭或清理资源的逻辑。

2. 线程之间的共享

synchronized 内置锁

Java 支持多个线程同时访问一个对象或者对象的成员变量, 关键字synchronized 可以修饰方法或者以同步块的形式来进行使用, 它主要确保多个线程在同一个时刻, 只能有一个线程处于方法或者同步块中, 它保证了线程对变量访问的可见性和排他性, 又称为内置锁机制。

对象锁和类锁:

对象锁是用于对象实例方法, 或者一个对象实例上的, 类锁是用于类的静态方法或者一个类的 class 对象上的。 我们知道, 类的对象实例可以有很多个, 但是每个类只有一个 class 对象, 所以不同对象实例的对象锁是互不干扰的, 但是每个类只有一个类锁。但是有一点必须注意的是, 其实类锁只是一个概念上的东西, 并不是真实存在的, 类锁其实锁的是每个类的对应的 class 对象。 类锁和对象锁之间也是互不干扰的。

Volatile

1.变量的值,从主内存拿,在本地cpu内存中计算,结束之后会更新到主内存,使用volatile,保证了变量的可见性,每个线程在本地内存修改了变量,主内存都能立即拿到,保证的变量的可见性。

2.volatile关键字和CAS保证了变量的原子性和可见性

对于volatile修饰的变量,jvm虚拟机只是保证从主内存加载到线程工作内存的值是最新的。

ThreadLocal

解决什么问题:ThreadLocal不是用来解决对象共享访问问题的,而主要是提供了保持对象的方法和避免参数传递的方便的对象访问方式。

每个线程Thread都会维护自己的ThreadLocalMap,这个map的key则是ThreadLocal类本身,而value则是我们保存的数据。ThreadLocal在多线程中是被公共持有的,被隔离的数据实际是存放在每个线程的ThreadLocalMap中的,只不过是通过ThreadLocal的引用得到每个线程维护的ThreadLocalMap中的value。

内存泄漏

JVM 利用设置 ThreadLocalMap 的 Key 为弱引用, 来避免内存泄露。

JVM 利用调用 remove、 get、 set 方法的时候, 回收弱引用。

当 ThreadLocal 存储很多 Key 为 null 的 Entry 的时候, 而不再去调用 remove、get、 set 方法, 那么将导致内存泄漏。

3. 线程之间的协作

线程之间相互配合, 完成某项工作, 比如: 一个线程修改了一个对象的值,而另一个线程感知到了变化, 然后进行相应的操作, 整个过程开始于一个线程,而最终执行又是另一个线程。 前者是生产者, 后者就是消费者, 这种模式隔离了“做什么” (what) 和“怎么做” (How) , 简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件, 如果条件满足则退出 while 循环, 从而完成消费者的工作。 却存在如下问题:

1) 难以确保及时性。

2) 难以降低开销。 如果降低睡眠的时间, 比如休眠 1 毫秒, 这样消费者能更加迅速地发现条件变化, 但是却可能消耗更多的处理器资源, 造成了无端的浪费。

等待/通知机制

是指一个线程 A 调用了对象 O 的 wait()方法进入等待状态, 而另一个线程 B调用了对象 O 的 notify()或者 notifyAll()方法,线程 A 收到通知后从对象 O 的 wait()

方法返回, 进而执行后续操作。 上述两个线程通过对象 O 来完成交互, 而对象上的 wait()和 notify/notifyAll()的关系就如同开关信号一样, 用来完成等待方和通知方之间的交互工作。

notify():

通知一个在对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁, 没有获得锁的线程重新进入 WAITING 状态。

notifyAll():

通知所有等待在该对象上的线程

wait()

调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回.需要注意,调用 wait()方法后,会释放对象的锁

wait(long)

超时等待一段时间,这里的参数时间是毫秒,也就是等待长达 n 毫秒,如果没有通知就超时返回

wait (long,int)

对于超时时间更细粒度的控制,可以达到纳秒

等待和通知的标准范式

等待方遵循如下原则。

1) 获取对象的锁。

2) 如果条件不满足, 那么调用对象的 wait()方法, 被通知后仍要检查条件。

3) 条件满足则执行对应的逻辑。

Synchronized(对象){

While(条件不满足){

对象.wait();

}

处理逻辑

}

通知方遵循如下原则。

1) 获得对象的锁。

2) 改变条件。

3) 通知所有等待在对象上的线程。

Synchronized(对象){

改变条件

对象.notifyAll();

}

调用 yield() 、 sleep()、 wait()、 notify()等方法对锁有何影响?

yield() 、 sleep()被调用后, 都不会释放当前线程所持有的锁。

调用 wait()方法后, 会释放当前线程持有的锁, 而且当前被唤醒后, 会重新去竞争锁, 锁竞争到后才会执行 wait 方法后面的代码。

调用 notify()系列方法后, 对锁无影响, 线程只有在 syn 同步代码执行完后才会自然而然的释放锁, 所以 notify()系列方法一般都是 syn 同步代码的最后一行。

4. 并发工具

Fork-Join

分而治之

分治法的设计思想是: 将一个难以直接解决的大问题, 分割成一些规模较小的相同问题, 以便各个击破, 分而治之。分治策略是: 对于一个规模为 n 的问题, 若该问题可以容易地解决(比如说规模 n 较小) 则直接解决, 否则将其分解为 k 个规模较小的子问题, 这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题, 然后将各子问题的解合并得到原问题的解。 这种算法设计策略叫做分治法。

归并排序

归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。 将已有序的子序列合并, 得到完全有序的序列; 即先使每个子序列有序, 再使子序列段间有序。

若将两个有序表合并成一个有序表, 称为 2-路归并, 与之对应的还有多路归并。

对于给定的一组数据, 利用递归与分治技术将数据序列划分成为越来越小的半子表, 在对半子表排序后, 再用递归方法将排好序的半子表合并成为越来越大的有序序列。

为了提升性能, 有时我们在半子表的个数小于某个数(比如 15) 的情况下,对半子表的排序采用其他排序算法, 比如插入排序。

Fork/Join 使用的标准范式

Java 并发编程,看这一篇就够了

FORK/JOIN

我们要使用 ForkJoin 框架, 必须首先创建一个 ForkJoin 任务。 它提供在任务中执行 fork 和 join 的操作机制, 通常我们不直接继承 ForkjoinTask 类, 只需要直接继承其子类。

1. RecursiveAction, 用于没有返回结果的任务

2. RecursiveTask, 用于有返回值的任务

task 要通过 ForkJoinPool 来执行, 使用 submit 或 invoke 提交, 两者的区别是: invoke 是同步执行, 调用之后需要等待任务完成, 才能执行后面的代码;submit 是异步执行。

join()和 get 方法当任务完成的时候返回计算结果。

CountdownLatch

应用场景:

1.某个线程在开始运行前等待n个线程执行完毕运行

2.主线程等待多个组件加载完毕

CountDownLatch的用法

CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

CyclicBarrier

CyclicBarrier让一组线程达到屏障,被拦截, await方法即屏障,当最后一个线程到达屏障后,所有到达屏障被阻塞的线程才会继续运行。

CyclicBarrier的CyclicBarrie(r int parties,RunnablebarrierAction) 构造方法,可以让现场达到屏障后,运行RunnablebarrierAction,然后被阻塞的现场才被运行。

CyclicBarrier 的字面意思是可循环使用(Cyclic) 的屏障(Barrier) 。 它要做的事情是, 让一组线程到达一个屏障(也可以叫同步点) 时被阻塞, 直到最后一个线程到达屏障时, 屏障才会开门, 所有被屏障拦截的线程才会继续运行。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties) , 其参数表示屏障拦截的线程数量, 每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障, 然后当前线程被阻塞。

CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrie(r int parties,RunnablebarrierAction) , 用于在线程到达屏障时, 优先执行 barrierAction, 方便处理更复杂的业务场景。

Semaphore

Semaphore: 设置许可证个数之后,调用release方法是可以继续添加个数的,不会受初始化设置的格式限制

Semaphore(信号量) 是用来控制同时访问特定资源的线程数量, 它通过协调各个线程, 以保证合理的使用公共资源。

应用场景

Semaphore 可以用于做流量控制, 特别是公用资源有限的应用场景, 比如数据库连接。 假如有一个需求,要读取几万个文件的数据, 因为都是 IO 密集型任务, 我们可以启动几十个线程并发地读取, 但是如果读到内存后, 还需要存储到数据库中, 而数据库的连接数只有 10 个, 这时我们必须控制只有 10 个线程同时获取数据库连接保存数据, 否则会报错无法获取数据库连接。 这个时候, 就可以使用 Semaphore 来做流量控制。 。 Semaphore 的构造方法 Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。 Semaphore 的用法也很简单, 首先线程使用 Semaphore的 acquire()方法获取一个许可证, 使用完之后调用 release()方法归还许可证。 还可以用 tryAcquire()方法尝试获取许可证。

Semaphore 还提供一些其他方法, 具体如下。

•intavailablePermits(): 返回此信号量中当前可用的许可证数。

•intgetQueueLength(): 返回正在等待获取许可证的线程数。

•booleanhasQueuedThreads(): 是否有线程正在等待获取许可证。

•void reducePermit(s int reduction): 减少 reduction 个许可证, 是个 protected方法。

•Collection getQueuedThreads(): 返回所有等待获取许可证的线程集合, 是个 protected 方法。

5. CAS

什么是 原子操作 ?如何实现原子操作?

CAS是英文单词Compare and Swap的缩写,翻译过来就是比较并替换。

CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。

CAS 的基本思路就是, 如果这个地址上的值和期望的值相等, 则给其赋予新值, 否则不做任何事儿, 但是要返回原值是多少。 循环 CAS 就是在一个循环里不断的做 cas 操作, 直到成功为止。

Java 并发编程,看这一篇就够了

CAS

CAS 实现原子操作的三大问题

ABA 问题

因为 CAS 需要在操作值的时候, 检查值有没有发生变化, 如果没有发生变化则更新, 但是如果一个值原来是 A, 变成了 B, 又变成了 A, 那么使用 CAS 进行检查时会发现它的值没有发生变化, 但是实际上却变化了。

循环时间长开销大

自旋 CAS 如果长时间不成功, 会给 CPU 带来非常大的执行开销。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作, 但是对多个共享变量操作时, 循环 CAS 就无法保证操作的原子性, 这个时候就可以用锁。

原子操作类

更新基本类型类:AtomicBoolean,AtomicInteger,AtomicLong

更新数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

更新引用类型:AtomicReference,AtomicMarkableReference,AtomicStampedReference

原子更新字段类: AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater

AtomicInteger

•int addAndGet(int delta) : 以原子方式将输入的数值与实例中的值(AtomicInteger 里的 value) 相加, 并返回结果。

•boolean compareAndSet(int expect, int update) : 如果输入的数值等于预期值, 则以原子方式将该值设置为输入的值。

•int getAndIncrement(): 以原子方式将当前值加 1, 注意, 这里返回的是自增前的值。

•int getAndSet(int newValue) : 以原子方式设置为 newValue 的值, 并返回旧值。

6. 显示锁

显式锁

Java 并发编程,看这一篇就够了

JAVA并发编程

有了 synchronized 为什么还要 lock

Java 程序是靠 synchronized 关键字实现锁功能的, 使用 synchronized 关键字将会隐式地获取锁, 但是它将锁的获取和释放固化了, 也就是先获取再释放。

Lock 的标准用法

Java 并发编程,看这一篇就够了

在 finally 块中释放锁, 目的是保证在获取到锁之后, 最终能够被释放。不要将获取锁的过程写在 try 块中, 因为如果在获取锁(自定义锁的实现)时发生了异常, 异常抛出的同时, 也会导致锁无故释放。

Java 并发编程,看这一篇就够了

ReentrantLock

锁的可重入

简单地讲就是: “同一个线程对于已经获得到的锁, 可以多次继续申请到该锁的使用权”。 而 synchronized 关键字隐式的支持重进入, 比如一个 synchronized修饰的递归方法, 在方法执行时, 执行线程在获取了锁之后仍能连续多次地获得该锁。 ReentrantLock 在调用 lock()方法时, 已经获取到锁的线程, 能够再次调用lock()方法获取锁而不被阻塞。

公平和非公平锁

如果在时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之, 是不公平的。 公平的获取锁, 也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。 ReentrantLock 提供了一个构造函数, 能够控制锁是否是公平的。 事实上, 公平的锁机制往往没有非公平的效率高。

Condition 接口

任意一个 Java 对象, 都拥有一组监视器方法(定义在 java.lang.Object 上) ,主要包括 wait()、 wait(long timeout)、 notify()以及 notifyAll()方法, 这些方法与synchronized 同步关键字配合, 可以实现等待/通知模式。 Condition 接口也提供了类似 Object 的监视器方法, 与 Lock 配合可以实现等待/通知模式。

Java 并发编程,看这一篇就够了

LockSupport

了解 LockSupport

LockSupport 定义了一组的公共静态方法, 这些方法提供了最基本的线程阻塞和唤醒功能, 而 LockSupport 也成为构建同步组件的基础工具。LockSupport 定义了一组以 park 开头的方法用来阻塞当前线程, 以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。 LockSupport 增加了park(Object blocker)、 parkNanos(Object blocker,long nanos)和 parkUntil(Objectblocker,long deadline)3 个方法, 用于实现阻塞当前线程的功能, 其中参数 blocker是用来标识当前线程在等待的对象(以下称为阻塞对象) , 该对象主要用于问题排查和系统监控。

Java 并发编程,看这一篇就够了

1.可以先调用unpark添加许可, 在park的时候直接使用许可

2.park使用一次许可之后,会把许可重置为0,即无论有多少次许可,只能使用一次

3.park线程被阻塞时候,调用了interrupt,线程解除阻塞 , 类似sleep、wait、notify线程被中断会报interruptException

4.与wait和notify比较:

(1)wait和notify都是Object中的方法,在调用这两个方法前必须先获得锁对象,但是park不需要获取某个对象的锁就可以锁住线程。

(2)notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。

7. AQS

AQS即是AbstractQueuedSynchronizer,一个用来构建锁和同步工具的框架,包括常用的ReentrantLock、CountDownLatch、Semaphore等。

AQS没有锁之类的概念,它有个state变量,是个int类型,在不同场合有着不同含义。

AQS围绕state提供两种基本操作“获取”和“释放”,有条双向队列存放阻塞的等待线程,并提供一系列判断和处理方法,简单说几点:

state是独占的,还是共享的;

state被获取后,其他线程需要等待;

state被释放后,唤醒等待线程;

线程等不及时,如何退出等待。

至于线程是否可以获得state,如何释放state,就不是AQS关心的了,要由子类具体实现。

方法:

Java 并发编程,看这一篇就够了

Java 并发编程,看这一篇就够了

8. 线程池

为什么要用线程池?

Java 中的线程池是运用场景最多的并发框架, 几乎所有需要异步或并发执行任务的程序都可以使用线程池。 在开发过程中, 合理地使用线程池能够带来 3个好处。

第一: 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗

第二: 提高响应速度。 当任务到达时, 任务可以不需要等到线程创建就能立即执行。 假设一个服务器完成一项任务所需时间为: T1 创建线程时间, T2 在线程中执行任务的时间, T3 销毁线程时间。 如果: T1 + T3 远大于 T2, 则可以采用线程池, 以提高服务器性能。 线程池技术正是关注如何缩短或调整 T1,T3 时间的技术, 从而提高服务器程序性能的。 它把 T1, T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段, 这样在服务器程序处理客户请求时,不会有 T1, T3 的开销了。

第三: 提高线程的可管理性。 线程是稀缺资源, 如果无限制地创建, 不仅会消耗系统资源, 还会降低系统的稳定性, 使用线程池可以进行统一分配、 调优和

监控。假设一个服务器一天要处理 50000 个请求, 并且每个请求需要一个单独的线程完成。 在线程池中, 线程数一般是固定的, 所以产生线程总数不会超过线程池中线程的数目, 而如果服务器不利用线程池来处理这些请求则线程总数为 50000。一般线程池大小是远小于 50000。 所以利用线程池的服务器程序不会为了创建50000 而在处理请求时浪费时间, 从而提高效率。

ThreadPool Executor 的类关系

Executor 是一个接口, 它是 Executor 框架的基础, 它将任务的提交与任务的执行分离开来。

ExecutorService 接口继承了 Executor, 在其上做了一些 shutdown()、 submit()的扩展, 可以说是真正的线程池接口;

AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法;

ThreadPoolExecutor 是线程池的核心实现类, 用来执行被提交的任务。

ScheduledExecutorService 接口继承了 ExecutorService 接口, 提供了带”周期执行”功能 ExecutorService;

ScheduledThreadPoolExecutor是一个实现类, 可以在给定的延迟后运行命令,或者定期执行命令。 ScheduledThreadPoolExecutor 比 Timer 更灵活, 功能更强大。

线程池的创建各个参数含义

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,longkeepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory

threadFactory,RejectedExecutionHandler handler)

corePoolSize

线程池中的核心线程数, 当提交一个任务时, 线程池创建一个新线程执行任务, 直到当前线程数等于 corePoolSize;

如果当前线程数为 corePoolSize, 继续提交的任务被保存到阻塞队列中, 等待被执行;

如果执行了线程池的 prestartAllCoreThreads()方法, 线程池会提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。 如果当前阻塞队列满了, 且继续提交任务, 则创建新的线程执行任务, 前提是当前线程数小于 maximumPoolSize

keepAliveTime

线程空闲时的存活时间, 即当线程没有任务执行时, 继续存活的时间。 默认情况下, 该参数只在线程数大于 corePoolSize 时才有用

TimeUnit

keepAliveTime 的时间单位

workQueue

workQueue 必须是 BlockingQueue 阻塞队列。 当线程池中的线程数超过它的corePoolSize 的时候, 线程会进入阻塞队列进行阻塞等待。 通过 workQueue, 线程池实现了阻塞功能

workQueue

用于保存等待执行的任务的阻塞队列, 一般来说, 我们应该尽量使用有界队列, 因为使用无界队列作为工作队列会对线程池带来如下影响。

1) 当线程池中的线程数达到 corePoolSize 后, 新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize。

2) 由于 1, 使用无界队列时 maximumPoolSize 将是一个无效参数。

3) 由于 1 和 2, 使用无界队列时 keepAliveTime 将是一个无效参数。

4) 更重要的, 使用无界 queue 可能会耗尽系统资源, 有界队列则有助于防止资源耗尽, 同时即使使用有界队列, 也要尽量控制队列的大小在一个合适的范围。

所以我们一般会使用, ArrayBlockingQueue、 LinkedBlockingQueue、SynchronousQueue、 PriorityBlockingQueue

threadFactory

创建线程的工厂, 通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名, 当然还可以更加自由的对线程做更多的设置, 比如设置所有的线程为守护线程。

Executors 静态工厂里默认的 threadFactory, 线程的命名规则是“pool-数字-thread-数字

RejectedExecutionHandler

线程池的饱和策略, 当阻塞队列满了, 且没有空闲的工作线程, 如果继续提交任务, 必须采取一种策略处理该任务, 线程池提供了 4 种策略:

( 1) AbortPolicy: 直接抛出异常, 默认策略;

( 2) CallerRunsPolicy: 用调用者所在的线程来执行任务;

( 3) DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务, 并执行当前任务;

( 4) DiscardPolicy: 直接丢弃任务;

当然也可以根据应用场景实现 RejectedExecutionHandler 接口, 自定义饱和策略, 如记录日志或持久化存储不能处理的任务。

扩展线程池

能扩展线程池的功能吗? 比如在任务执行的前后做一点我们自己的业务工作? 实际上, JDK 的线程池已经为我们预留的接口, 在线程池核心方法中, 有 2个方法是空的, 就是给我们预留的。 还有一个线程池退出时会调用的方法。

可以看到, 每个任务执行前后都会调用 beforeExecute 和 afterExecute 方法。相当于执行了一个切面。 而在调用 shutdown 方法后则会调用 terminated 方法。

线程池的工作机制

1) 如果当前运行的线程少于 corePoolSize, 则创建新线程来执行任务( 注意,执行这一步骤需要获取全局锁) 。

2) 如果运行的线程等于或多于 corePoolSize, 则将任务加入 BlockingQueue。

3) 如果无法将任务加入 BlockingQueue( 队列已满) , 则创建新的线程来处理任务。

4) 如果创建新线程将使当前运行的线程超出 maximumPoolSize, 任务将被拒绝, 并调用 RejectedExecutionHandler.rejectedExecution()方法。

提交任务

execute()方法用于提交不需要返回值的任务, 所以无法判断任务是否被线程池执行成功。

submit()方法用于提交需要返回值的任务。 线程池会返回一个 future 类型的对象, 通过这个 future 对象可以判断任务是否执行成功, 并且可以通过 future的 get()方法来获取返回值, get()方法会阻塞当前线程直到任务完成, 而使用 get( long timeout, TimeUnit unit) 方法则会阻塞当前线程一段时间后立即返回, 这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。 它们的原理是遍历线程池中的工作线程, 然后逐个调用线程的 interrupt 方法来中断线程, 所以无法响应中断的任务可能永远无法终止。 但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP, 然后尝试停止所有的正在执行或暂停任务的线程, 并返回等待执行任务的列表, 而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态, 然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个, isShutdown 方法就会返回 true。当所有的任务都已关闭后, 才表示线程池关闭成功, 这时调用 isTerminaed 方法会返回 true。 至于应该调用哪一种方法来关闭线程池, 应该由提交到线程池的任务特性决定, 通常调用 shutdown 方法来关闭线程池, 如果任务不一定要执行完,则可以调用 shutdownNow 方法。

预定义线程池

FixedThreadPool 详解

创建使用固定线程数的 FixedThreadPool 的 API。 适用于为了满足资源管理的需求, 而需要限制当前线程数量的应用场景, 它适用于负载比较重的服务器。

FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建

FixedThreadPool 时指定的参数 nThreads。

当线程池中的线程数大于 corePoolSize 时, keepAliveTime 为多余的空闲线程等待新任务的

最长时间, 超过这个时间后多余的线程将被终止。 这里把 keepAliveTime 设置为 0L, 意味着多余的空闲线程会被立即终止。

FixedThreadPool 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE) 。

SingleThreadExecutor

创建使用单个线程的 SingleThread-Executor 的 API, 于需要保证顺序地执行各个任务; 并且在任意时间点, 不会有多个线程是活动的应用场景。

corePoolSize 和 maximumPoolSize 被设置为 1。 其他参数与 FixedThreadPool相同。 SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工

作队列(队列的容量为 Integer.MAX_VALUE) 。

CachedThreadPool

创建一个会根据需要创建新线程的 CachedThreadPool 的 API。 大小无界的线程池, 适用于执行很多的短期异步任务的小程序, 或者是负载较轻的服务器。

corePoolSize 被设置为 0, 即 corePool 为空; maximumPoolSize 被设置为Integer.MAX_VALUE。 这里把 keepAliveTime 设置为 60L, 意味着 CachedThreadPool

中的空闲线程等待新任务的最长时间为60秒, 空闲线程超过60秒后将会被终止。

FixedThreadPool 和 SingleThreadExecutor 使用有界队列 LinkedBlockingQueue作为线程池的工作队列。 CachedThreadPool 使用没有容量的 SynchronousQueue作为线程池的工作队列, 但 CachedThreadPool 的 maximumPool 是无界的。 这意味着, 如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。 极端情况下, CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存资源。

合理地配置线程池

要想合理地配置线程池, 就必须首先分析任务特性

要想合理地配置线程池, 就必须首先分析任务特性, 可以从以下几个角度来分析。

•任务的性质: CPU 密集型任务、 IO 密集型任务和混合型任务。

•任务的优先级: 高、 中和低。

•任务的执行时间: 长、 中和短。

•任务的依赖性: 是否依赖其他系统资源, 如数据库连接。性质不同的任务可以用不同规模的线程池分开处理。

CPU 密集型任务应配置尽可能小的线程, 如配置 Ncpu+1 个线程的线程池。

由于 IO 密集型任务线程并不是一直在执行任务, 则应配置尽可能多的线程, 如2*Ncpu。

混合型的任务, 如果可以拆分, 将其拆分成一个 CPU 密集型任务和一个 IO密集型任务, 只要这两个任务执行的时间相差不是太大, 那么分解后执行的吞吐量将高于串行执行的吞吐量。 如果这两个任务执行时间相差太大, 则没必要进行分解。 可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU 个数。

对于 IO 型的任务的最佳线程数, 有个公式可以计算

Nthreads = NCPU * UCPU * (1 + W/C)

其中:

❑ NCPU 是处理器的核的数目

❑ UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)

❑ W/C 是等待时间与计算时间的比率

等待时间与计算时间我们在 Linux 下使用相关的 vmstat 命令或者 top 命令查看。

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。 它可以让优先级高的任务先执行。

执行时间不同的任务可以交给不同规模的线程池来处理, 或者可以使用优先级队列, 让执行时间短的任务先执行。

依赖数据库连接池的任务, 因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长, 则 CPU 空闲时间就越长, 那么线程数应该设置得越大, 这样才能更好地利用 CPU。

建议使用有界队列。 有界队列能增加系统的稳定性和预警能力, 可以根据需要设大一点儿, 比如几千。

假设, 我们现在有一个 Web 系统, 里面使用了线程池来处理业务, 在某些情况下, 系统里后台任务线程池的队列和线程池全满了, 不断抛出抛弃任务的异常, 通过排查发现是数据库出现了问题, 导致执行 SQL 变得非常缓慢, 因为后台任务线程池里的任务全是需要向数据库查询和插入数据的, 所以导致线程池里的工作线程全部阻塞, 任务积压在线程池里。

如果当时我们设置成无界队列, 那么线程池的队列就会越来越多, 有可能会撑满内存, 导致整个系统不可用, 而不只是后台任务出现问题。

文章来源:智云一二三科技

文章标题:Java 并发编程,看这一篇就够了

文章地址:https://www.zhihuclub.com/194687.shtml

关于作者: 智云科技

热门文章

网站地图