您的位置 首页 java

Java并发中级知识整理

接上篇

并发锁和容器

抽象同步队列AQS

AQS是Java并发包的基础,提供了一个实现阻塞锁和相关同步器(信号量,事件等)的框架,依赖于先进先出(FIFO)等待队列。

AQS双向等待队列和单向条件队列

AQS的state变量是AQS同步的关键,由volatile修饰,保证可见性,在不同的锁和同步器的实现中,state有不同的含义。

AQS的head和tail都是Node类型,也由volatile修饰,他们形成双向的等待队列,其中head是哨兵。

Node中的thread变量用来存放进入AQS队列里面的 线程 ,Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的;waitStatus变量记录当前线程等待状态,可以为CANCELLED(线程被取消了,值为1)、SIGNAL(线程需要被唤醒,值为-1)、CONDITION(线程在条件队列里面等待,值为-2)、PROPAGATE(释放共享资源时需要通知其他节点,值为-3); prev记录当前节点的前驱节点,next记录当前节点的后继节点。

AQS的ConditionObject内部类,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,这个条件队列的头、尾元素分别为firstWaiter和lastWaiter。链表队列的指针是Node的nextWaiter变量。

JDK中的rt.jar包里面的LockSupport是个工具类,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。LockSupport类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的,因此会被挂起。park挂起,unpark唤醒。

AQS通过CAS操作来竞争共享资源,通过 lock Support类来挂起和唤醒线程。

ReentrantLock锁

ReentrantLock是可重入的独占锁,同时只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列里面。

state可以用来表示当前线程获取锁的可重入次数。

ReentrantReadWriteLock锁

ReentrantReadWriteLock采用读写分离的策略,允许多个线程可以同时获取读锁,适合写少读多的场景。内部维护了一个ReadLock和一个WriteLock。读锁是共享锁,写锁是独占锁。

state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数。

StampedLock锁

StampedLock提供的三种读写模式的锁:写锁writeLock,悲观读锁readLock和乐观读锁tryOptimisticRead。都是不可重入锁,获取乐观读锁时不需要进行CAS操作设置锁的状态,而只是简单地测试状态,所以需要复制一份要操作的变量到方法栈,使用快照来保持数据一致性,在具体操作数据前还需要调用validate方法验证。

CountDownLatch

适用于需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。

state用来表示 计数器 当前的值。

当线程调用await方法后,当前线程会被放入AQS的阻塞队列等待计数器为0再返回。

其他线程调用countDown方法使用CAS将计数器值减1, CAS失败则循环重试;如果计数器值为0,唤醒因调用CountDownLatch的await方法而被阻塞的线程,具体是调用AQS的doReleaseShared方法来激活阻塞的线程。

CyclicBarrier

CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。适合分段任务有序执行的场景。

基于ReentLock和Condition实现,调用CyclicBarrier的wait方法会把当前线程放在条件队列里面,所有线程都调用之后,会使用条件队列的signall唤醒所有线程。

Semaphore

Semaphore类似一个资源池,每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。Semaphore内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。

state用来表示当前可用信号的个数。

Phaser

Phaser用来解决控制多个线程分阶段共同完成任务的情景问题。其作用相比CountDownLatch和CyclicBarrier更加灵活,它不需要在构造的时候指定固定数目的 parties

CompletableFuture

CompletableFuture是Java8提供的用于异步编程的工具类。

并发原子操作

JDK提供的原子性操作类都是使用非阻塞算法CAS实现的,相比使用锁实现原子性操作这在性能上有很大提高。

并发列表

CopyOnWriteArrayList是一个 线程安全 的ArrayList,对其进行的修改操作都是在底层的一个复制的数组(快照)上进行的,使用了写时复制策略。每个CopyOnWriteArrayList对象里面有一个array数组对象用来存放具体元素,ReentrantLock独占锁对象用来保证同时只有一个线程对array进行增删改。另外CopyOnWriteArrayList提供了弱一致性的迭代器,从而保证在获取迭代器后,其他线程对list的修改是不可见的,迭代器遍历的数组是一个快照。

并发Map

ConcurrentHashMap是线程安全的HashMap。

无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。JDK1.8的ConcurrentHashMap取消了Segment分段锁,采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构类似,数组+链表/红黑二叉树。Java 8在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为O(N))转换为红黑树(寻址时间复杂度为O(log(N)))。synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。

ConcurrentSkipListMap使用跳表的数据结构进行快速查找。

跳表的本质是同时维护了多个链表,并且链表是分层的。最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。

并发队列

并发队列分为阻塞队列和非阻塞队列,前者使用锁实现,而后者则使用CAS非阻塞算法实现。

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,其底层数据结构使用单向链表实现,对于入队和出队操作使用无限循环的CAS来实现线程安全,使用CPU资源换取阻塞所带来的开销。默认头、尾节点都是指向item为null的哨兵节点。新元素会被插入队列末尾,出队时从队列头部获取一个元素。

LinkedBlockingQueue 是线程安全的有界阻塞队列,默认队列容量为0x7fffffff,使用单向链表实现。有两个ReentrantLock的实例,分别用来控制元素入队和出队的原子性,其中takeLock用来控制同时只有一个线程可以从队列头获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。另外,notEmpty和notFull是条件变量,它们内部都有一个条件队列用来存放进队和出队时被阻塞的线程。

ArrayBlockingQueue 是线程安全的有界阻塞队列,使用有界数组实现,不扩容。有一个ReentrantLock的实例保证出、入队操作的原子性,这保证了同时只有一个线程可以进行入队、出队操作。另外,notEmpty、notFull条件变量用来进行出、入队的同步。

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队始终保证出队的元素是堆树的根节点,因此都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。默认使用对象的compareTo方法提供比较规则,如果你需要自定义比较规则则可以自定义comparators。使用数组作为存储元素的数据结构,初始容量是11,有一个allocationSpinLock的int变量,是一个自旋锁,其使用CAS操作来保证同时只有一个线程可以扩容队列,状态为0或者1,其中0表示当前没有进行扩容,1表示当前正在扩容,扩容时如果小于64,扩容一倍加2,否则扩容50%。PriorityBlockingQueue类似于ArrayBlockingQueue,在内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队操作。只使用了一个notEmpty条件变量,用于take操作。

DelayQueue 是无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最快要过期的元素。内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。另外,队列里面的元素要实现Delayed接口,由于每个元素都有一个过期时间,所以要实现获知当前元素还剩下多少时间就过期了的接口。条件变量available与lock锁是对应的,其目的是为了实现线程间同步。

线程池

为什么使用线程池

可以降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

可以提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行。

可以提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池原理

corePoolSize:线程池核心线程个数。定义了最小可以同时运行的线程数量。

workQueue:用于保存等待执行的任务的阻塞队列,比如基于数组的有界ArrayBlockingQueue、基于链表的无界LinkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue及优先级队列PriorityBlockingQueue等。当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

maximunPoolSize:线程池最大线程数量。当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

ThreadFactory:创建线程的工厂。

RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到maximunPoolSize后采取的策略,比如AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)及DiscardPolicy(默默丢弃,不抛出异常)。

keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。

TimeUnit:存活时间的时间单位。

newFixedThreadPool :创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。

newSingleThreadExecutor:创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。

newCachedThreadPool :创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。

ScheduledThreadPoolExecutor是一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。内部使用DelayQueue来存放具体任务。任务分为三种,其中一次性执行任务执行完毕就结束了,fixed-delay任务保证同一个任务在多次执行之间间隔固定时间,fixed-rate任务保证按照固定的频率执行

线程池大小的设置

对于计算密集的任务,在拥有 N 个处理器的系统上,当线程池大小为 N+1 时,通常能实现最优的利用率。(即使当计算密集型的线程偶尔由于页缺失故障或者其它原因而暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费。)

对于包含 I/O 操作或者其它阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。为了正确地设置线程池的长度,需要估算出任务花在等待的时间与用来计算的时间的比率,为了保持处理器达到期望的使用率,最优的池的大小等于 CPU数量 * 目标CPU的使用率 * (1+等待时间除以计算时间的比率)

当任务需要某种通过资源池来管理的资源时,例如数据库连接,那么线程池和资源池的大小将会相互影响。如果每个任务都需要一个数据库连接,那么连接池的大小就限制了线程池的大小。同样,当线程池中的任务是数据库连接的唯一使用者时,那么线程池的大小又将限制连接池的大小。

通过累加出每个任务需要的资源的总量,然后除以可用的总量,得出的结果是线程池大小的上限。

并发实践案例

Logback的异步日志模型是一个多生产者-单消费者模型,其通过使用队列把同步日志打印转换为了异步,业务线程只需要通过调用异步appender把日志任务放入日志队列,而日志线程则负责使用同步的appender进行具体的日志打印。日志打印线程只需要负责生产日志并将其放入队列,而不需要关心消费线程何时把日志具体写入磁盘。logback使用的是有界队列ArrayBlockingQueue,之所以使用有界队列是考虑内存溢出问题。在高并发下写日志的QPS会很高,如果设置为无界队列,队列本身会占用很大的内存,很可能会造成OOM。

Tomcat使用队列把接受请求与处理请求操作进行解耦,实现异步处理。其实Tomcat中NioEndPoint中的每个Poller里面都维护一个ConcurrentLinkedQueue,用来缓存请求任务,其本身也是一个多生产者-单消费者模型。

线程池使用FutureTask时如果把拒绝策略设置为DiscardPolicy和DiscardOldestPolicy,并且在被拒绝的任务的Future对象上调用了无参get方法,那么调用线程会一直被阻塞。所以当使用Future时,尽量使用带超时时间的get方法,或者重写拒绝策略修改FutureTask的状态。

参考资料

  1. Javadoop:
  2. Java并发编程之美

文章来源:智云一二三科技

文章标题:Java并发中级知识整理

文章地址:https://www.zhihuclub.com/176654.shtml

关于作者: 智云科技

热门文章

网站地图