您的位置 首页 java

实战java高并发程序设计第三章(一)

1. 同步控制

  • synchronized 的扩展:重入锁

同步控制不仅有synchronized配合object.wait()以及object.notify(),也有增强版的reentrantLock(重入锁)

public class Reenter lock  implements Runnable{
 public static ReentrantLock lock=new ReentrantLock();
 public static int i=0;
 @Override
 public void run() {
 for(int j=0;j<10000000;j++){
 lock.lock();
 lock.lock(); //此处演示重入性
 try{
 i++;
 }finally{
 lock.unlock(); //退出临界区必须解锁
 lock.unlock();
 }
 }
 }
 public static void main(String[] args) throws InterruptedException {
 ReenterLock tl=new ReenterLock();
 Thread t1=new Thread(tl);
 Thread t2=new Thread(tl);
 t1. start ();t2.start();
 t1.join();t2.join();
 System.out.println(i); //计算结果为 20000000
 }
}
 

我们来看下reentrantlock相比synchronized锁有何优点:

1.中断响应

面对死锁,似乎synchronized没有任何主动解决策略,而reentrantlock则可以轻松解决

public class IntLock implements Runnable {
 public static ReentrantLock lock1 = new ReentrantLock();
 public static ReentrantLock lock2 = new ReentrantLock();
 int lock;
 /**
 * 控制加锁顺序,方便构造死锁
 * @param lock
 */
 public IntLock(int lock) {
 this.lock = lock;
 }
 @Override
 public void run() {
 try {
 if (lock == 1) {
 lock1.lockInterruptibly(); //可中断的加锁
 try{
 Thread.sleep(500);
 }catch(InterruptedException e){}
 lock2.lockInterruptibly();
 } else {
 lock2.lockInterruptibly();
 try{
 Thread.sleep(500);
 }catch(InterruptedException e){}
 lock1.lockInterruptibly();
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 System.out.println(Thread.currentThread().getName()+": 线程 被中断");
 } finally {
 if (lock1.isHeldByCurrentThread())
 lock1.unlock();
 if (lock2.isHeldByCurrentThread())
 lock2.unlock();
 System.out.println(Thread.currentThread().getName()+":线程退出");
 }
 }
 public static void main(String[] args) throws InterruptedException {
 IntLock r1 = new IntLock(1);
 IntLock r2 = new IntLock(2);
 Thread t1 = new Thread(r1,"线程1");
 Thread t2 = new Thread(r2,"线程2");
 t1.start();t2.start();
 Thread.sleep(1000);
 //中断其中一个线程
 t2.interrupt();
 }
}
// 输出结果:
// java.lang.InterruptedException
// at //java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
// at //java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
// at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
// at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31)
// at java.lang.Thread.run(Thread.java:745)
// 线程2:线程被中断
// 线程2:线程退出
// 线程1:线程退出
 

由上可知,当t1,t2形成死锁时,可以主动利用中断来解开,但完成任务的只有t1,t2被中断. 而如果换成synchronized则将无法进行中断

  1. 1锁申请等待时限
lock1.tryLock(); //尝试获取锁,获得立即返回true,未获得立即返回false
lock1.tryLock(5, TimeUnit.SECONDS); //尝试获取锁,5秒内未获得则返回false,获得返回true
 
public class TryLock implements Runnable {
 public static ReentrantLock lock1 = new ReentrantLock();
 public static ReentrantLock lock2 = new ReentrantLock();
 int lock;
 public TryLock(int lock) {
 this.lock = lock;
 }
 @Override
 public void run() {
 if (lock == 1) {
 while (true) {
 if (lock1.tryLock()) {
 try {
 try {
 Thread.sleep(500);
 } catch (InterruptedException e) {
 }
 if (lock2.tryLock()) {
 try {
 System.out.println(Thread.currentThread()
 .getId() + ":My Job done");
 return;
 } finally {
 lock2.unlock();
 }
 }
 } finally {
 lock1.unlock();
 }
 }
 }
 } else {
 while (true) {
 if (lock2.tryLock()) {
 try {
 try {
 Thread.sleep(500);
 } catch (InterruptedException e) {
 }
 if (lock1.tryLock()) {
 try {
 System.out.println(Thread.currentThread()
 .getId() + ":My Job done");
 return;
 } finally {
 lock1.unlock();
 }
 }
 } finally {
 lock2.unlock();
 }
 }
 }
 }
 }
 public static void main(String[] args) throws InterruptedException {
 TryLock r1 = new TryLock(1);
 TryLock r2 = new TryLock(2);
 Thread t1 = new Thread(r1);
 Thread t2 = new Thread(r2);
 t1.start();
 t2.start();
 }
}
// 15:My Job done
// 14:My Job done
 

使用trylock可以有效地避免产生死锁

  1. 公平锁

synchronized锁为非公平锁,而reentrantLock既可以是公平锁也可以是非公平锁

非公平锁容易产生饥饿,公平锁先进先出,但效率不敌非公平锁

public ReentrantLock(boolean fair)
 

ddd

  • 重入锁的搭档Condition

Condition和object.wait(),object.notify()方法类似

condition的基本方法如下:

void await() throws InterruptedException; //使当前线程等待,释放锁,能响应 signal 和signalAll方法,响应中断
void awaitUninterruptibly(); //类似 await,但不响应中断
long awaitNanos(long nanosTimeout)throws InterruptedException; //等待一段时间
boolean await (long time,TimeUnit unit)throws InterruptedException;
boolean awaitUntil(Date deadline)throws InterruptedException;
void signal(); //唤醒一个等待中的线程
void signalAll(); //唤醒所有等待中的线程
 

JDK内部就有很多对于ReentrantLock的使用,如ArrayBlockingQueue

 //在 ArrayBlockingQueue中的一些定义
 boolean fair = true;
 private final ReentrantLock lock = new ReentrantLock(fair);
 private final Condition notEmpty = lock.newCondition();
 private final Condition notFull = lock.newCondition();
 //put(方法的实现 
 public void put(E e) throws InterruptedException {
 if (e == null) throw new NullPointerException();
 final E[] items = this.items;
 final ReentrantLock lock = this.lock;
 lock.lockInterruptibly(); //put方法做同步
 try {
 try {
 while (count == items.length) //队列已满
 notFull.await(); //等待队列有足够的空间
 } catch (InterruptedException ie) {
 notFull.signal();
 throw ie;
 }
 insert(e); //notFull被通知时,说明有足够的空间
 } finally {
 lock.unlock();
 }
 }
 private void insert(E x) {
 items[putIndex] = x;
 putIndex = inc(putIndex);
 ++count;
 notFull.signal(); //通知take方法的线程,队列已有数据
 }
 public E take() throws InterruptedException {
 final ReentrantLock lock = this.lock;
 lock.lockInterruptibly(); //对take()方法做同步
 try {
 try {
 while (count == 0) //如果队列为空
 notEmpty.await(); //则消费者队列要等待一个非空的信号
 } catch (InterruptedException ie) {
 notEmpty.signal();
 throw ie;
 }
 E x = extract();
 return x;
 } finally {
 lock.unlock();
 }
 }
 private E extract() {
 final E[] items = this.items;
 E x = items[takeIndex];
 items[takeIndex] = null;
 takeIndex = inc(takeIndex);
 --count;
 notFull.signal(); //通知put线程队列已有空闲空间
 return x;
 }
 
  • 多线程同时访问: 信号量 (semaphore)

同步锁只能允许一个线程进行访问,信号量可以指定多个线程同时访问同一个资源.

 //构造方法
 public Semaphore(int permits) //传入int表示能同时访问的线程数
 public Semaphore(int permits, boolean fair) //线程数,是否公平锁
 
 //实例方法
 public void acquire() throws InterruptedException //获取一个访问权限,会阻塞线程,会被打断
 public void acquireUninterruptibly() //获取一个访问权限,会阻塞线程,不会被打断
 public boolean tryAcquire() //获取一个访问权限,立即返回
 public boolean tryAcquire(long timeout, TimeUnit unit) //获取一个访问权限,尝试一段时间
 public void release() //释放一个访问权限
 
public class SemapDemo implements Runnable {
 final Semaphore semp = new Semaphore(5);
 @Override
 public void run() {
 try {
 semp.acquire(); 
 Thread.sleep(2000);
 System.out.println(Thread.currentThread().getId() + ":done!");
 } catch (InterruptedException e) {
 e.printStackTrace();
 } finally {
 semp.release(); //使用完后要释放,否则会引起信号量泄漏
 }
 }
 public static void main(String[] args) {
 ExecutorService exec = Executors.newFixedThreadPool(20);
 final SemapDemo demo = new SemapDemo();
 for (int i = 0; i < 20; i++) {
 exec.submit(demo);
 }
 }
}
 //输出结果
 //每次输出5个结果,对应信号量的5个许可
 
  • 读写锁 ReadWriteLock
  • 读写锁适用于读多写少的场景,读读之间为并行,读写之间为串行,写写之间也为串行
public class ReadWriteLockDemo {
 private static Lock lock=new ReentrantLock();
 private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); //获取读写锁
 private static Lock readLock = readWriteLock.readLock(); //读锁
 private static Lock writeLock = readWriteLock.writeLock(); //写锁
 private int value;
 public Object handleRead(Lock lock) throws InterruptedException{
 try{
 lock.lock(); //模拟读操作
 Thread.sleep(1000); //读操作的耗时越多,读写锁的优势就越明显
 return value; 
 }finally{
 lock.unlock();
 }
 }
 public void handleWrite(Lock lock,int index) throws InterruptedException{
 try{
 lock.lock(); //模拟写操作
 Thread.sleep(1000);
 value=index;
 }finally{
 lock.unlock();
 }
 }
 public static void main(String[] args) {
 final ReadWriteLockDemo demo=new ReadWriteLockDemo();
 Runnable readRunnale=new Runnable() {
 @Override
 public void run() {
 try {
 demo.handleRead(readLock);
// demo.handleRead(lock);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 };
 Runnable writeRunnale=new Runnable() {
 @Override
 public void run() {
 try {
 demo.handleWrite(writeLock,new Random().nextInt());
// demo.handleWrite(lock,new Random().nextInt());
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 };
 for(int i=0;i<18;i++){
 new Thread(readRunnale).start();
 }
 
 for(int i=18;i<20;i++){
 new Thread(writeRunnale).start();
 } 
 }
}
//结果:
//读写锁明显要比单纯的锁要更快结束,说明读写锁确实提升不少效率
 
  • 倒计数器CountDownLatch
  • 让一个线程等待,知道倒计时结束
public class CountDownLatchDemo implements Runnable {
 static final CountDownLatch end = new CountDownLatch(10); //构造倒计时器,倒计数为10
 static final CountDownLatchDemo demo=new CountDownLatchDemo(); 
 @Override
 public void run() {
 try {
 //模拟检查任务
 Thread.sleep(new Random().nextInt(10)*1000);
 System.out.println("check complete");
 end.countDown(); //倒计时器减1
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 public static void main(String[] args) throws InterruptedException {
 ExecutorService exec = Executors.newFixedThreadPool(10);
 for(int i=0;i<10;i++){
 exec.submit(demo);
 }
 //等待检查
 end.await(); //主线程阻塞,待其他线程全部完成后再唤醒主线程
 //发射火箭
 System.out.println("Fire!");
 exec.shutdown();
 }
}
 
  • 循环栅栏CyclicBarrier
  • 循环栅栏类似于倒计时器,但是计数器可以反复使用,cyclicBarrier比CountDownLatch稍微强大些,可以传入一个barrierAction,barrierAction指每次完成计数便出发一次
public CyclicBarrier(int parties,Runnable barrierAction) //构造方法
 
public class CyclicBarrierDemo {
 public static class Soldier implements Runnable {
 private String soldier;
 private final CyclicBarrier cyclic;
 Soldier(CyclicBarrier cyclic, String soldierName) {
 this.cyclic = cyclic;
 this.soldier = soldierName;
 }
 public void run() {
 try {
 //等待所有士兵到齐
 cyclic.await(); //触发一次循环栅栏,达到计数器后才会进行下一步工作
 doWork();
 //等待所有士兵完成工作
 cyclic.await(); //再次触发循环栅栏,达到计数器后才会进行下一步工作
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (BrokenBarrierException e) {
 e.printStackTrace();
 }
 }
 void doWork() {
 try {
 Thread.sleep(Math.abs(new Random().nextInt()%10000)); //模拟工作
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println(soldier + ":任务完成");
 }
 }
 public static class BarrierRun implements Runnable { //用于传入CyclicBarrier的构造方法,作为达到计数器数值后的触发任务, 可以被多次调用
 boolean flag;
 int N;
 public BarrierRun(boolean flag, int N) {
 this.flag = flag;
 this.N = N;
 }
 public void run() {
 if (flag) {
 System.out.println("司令:[士兵" + N + "个,任务完成!]");
 } else {
 System.out.println("司令:[士兵" + N + "个,集合完毕!]");
 flag = true;
 }
 }
 }
 public static void main(String args[]) throws InterruptedException {
 final int N = 10;
 Thread[] allSoldier=new Thread[N];
 boolean flag = false;
 CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
 //设置屏障点,主要是为了执行这个方法
 System.out.println("集合队伍!");
 for (int i = 0; i < N; ++i) {
 System.out.println("士兵 "+i+" 报道!");
 allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i));
 allSoldier[i].start();
 }
 }
}
 
注意:
一旦其中一个被interrupt后,很可能会抛出一个interruptExpection和9个BrokenBarrierException,表示该循环栅栏已破损,防止其他线程进行无所谓的长久等待
 
  • 线程阻塞工具LockSupport
  • LockSupport是一个非常实用的线程阻塞工具,不需要获取某个对象的锁(如wait),也不会抛出interruptedException异常
public static void park() //挂起当前线程,
public static void park(Object blocker) //挂起当前线程,显示阻塞对象,parking to wait for <地址值>
 
public class LockSupportDemo {
 public static Object u = new Object();
 static ChangeObjectThread t1 = new ChangeObjectThread("t1");
 static ChangeObjectThread t2 = new ChangeObjectThread("t2");
 public static class ChangeObjectThread extends Thread {
 public ChangeObjectThread(String name){
 super.setName(name);
 }
 @Override
 public void run() {
 synchronized (u) {
 System.out.println("in "+getName());
 LockSupport.park(this); 
 }
 }
 }
 public static void main(String[] args) throws InterruptedException {
 t1.start();
 Thread.sleep(100);
 t2.start();
 LockSupport.unpark(t1);
 LockSupport.unpark(t2); //即使unpark发生在park前,也可以使程序正常结束
 t1.join();
 t2.join();
 }
}
 
LockSupport使用了类似信号量的机制,它为每个线程准备一个许可,如果许可可用,park立即返回,并且消费这个许可(转为不可用),如果许可不可用,就会阻塞,而unpark方法就是使一个许可变为可用
locksupport.park()可以相应中断,但是不会抛出interruptedException,我们可以用Thread.interrupted等方法中获取中断标记.
 
public class LockSupportIntDemo {
 public static Object u = new Object();
 static ChangeObjectThread t1 = new ChangeObjectThread("t1");
 static ChangeObjectThread t2 = new ChangeObjectThread("t2");
 public static class ChangeObjectThread extends Thread {
 public ChangeObjectThread(String name){
 super.setName(name);
 }
 @Override
 public void run() {
 synchronized (u) {
 System.out.println("in "+getName());
 LockSupport.park();
 if(Thread.interrupted()){ //检测到中断位,并清除中断状态
 System.out.println(getName()+" 被中断了");
 }
 if (Thread.currentThread().isInterrupted()){ //中断状态已被清除,无法检测到
 System.out.println(1);
 }
 }
 System.out.println(getName()+"执行结束");
 }
 }
 public static void main(String[] args) throws InterruptedException {
 t1.start();
 Thread.sleep(100);
 t2.start();
 t1.interrupt();
 LockSupport.unpark(t2);
 }
}
//输出:
//in t1
//t1 被中断了
//t1执行结束
//in t2
//t2执行结束
 
  • Guava和Limiter限流
  • 限流算法一般有两种:漏桶算法和令牌桶算法
  • 漏桶算法: 利用缓存区,所有请求进入系统,都在缓存区中保存,然后以固定的流速流出缓存区进行处理.
  • 令牌桶算法: 桶中存放令牌,每个请求拿到令牌后才能进行处理,如果没有令牌,请求要么等待,要么丢弃.RateLimiter就是采用这种算法
public class RateLimiterDemo {
 static RateLimiter limiter = RateLimiter.create(2); //每秒处理2个请求
 public static class Task implements Runnable {
 @Override
 public void run() {
 System.out.println(System.currentTimeMillis());
 }
 }
 public static void main(String args[]) throws InterruptedException {
 for (int i = 0; i < 50; i++) {
 limiter.acquire(); //过剩流量会等待
 new Thread(new Task()).start();
 }
 }
}
// 某些场景倾向于丢弃过剩流量,tryAcquire则是立即返回,不会阻塞
// for (int i = 0; i < 50; i++) { 
// if(!limiter.tryAcquire()) {
// continue;
// }
// new Thread(new Task()).start();
// }
 

2. 线程池

  • Executors框架

Executor框架提供了各种类型的线程池,主要有以下工厂方法:

//固定线程数量,当有新任务提交时,若池中有空闲线程则立即执行,若没有空闲线程,任务会被暂存在一个任务队列中,直到有空闲线程
public static ExecutorService newFixedThreadPool(int nThreads)
//返回只有一个线程的线程池,多余任务被保存到一个任务队列中,线程空闲时,按先入先出的顺序执行队列中的任务
public static ExecutorService newSingleThreadPoolExecutor()
//线程数量不固定,优先使用空闲线程,多余任务会创建新线程
public static ExecutorService newCachedThreadPool()
//线程数量为1,给定时间执行某任务,或周期性执行任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
//线程数量可以指定,定时或周期性执行任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
 

计划任务:newScheduledThreadPool主要方法

//给定时间,对任务进行一次调度
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//周期调度,以任务完成后间隔固定时间调度下一个任务,(两者相加)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
 long delay,
 TimeUnit unit);
//周期调度,两个任务开始的时间差为固定间隔,如果任务时间大于间隔时间则以任务时间为准(两者取其大者)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
 long period,
 TimeUnit unit);
 

注意:

  • 核心线程池 ThreadPoolExecutor

ThreadPoolExecutor构造函数:

 public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
 int maximumPoolSize, //最大线程池大小
 long keepAliveTime, //线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
 TimeUnit unit, //keepAliveTime时间单位
 BlockingQueue<Runnable> workQueue, //阻塞任务队列
 ThreadFactory threadFactory, //新建线程工厂
 RejectedExecutionHandler handler ) //当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
 

workQueue指被提交但是未执行的任务队列,是BlockingQueue接口的对象

1.直接提交队列:SynchronousQueue,该队列没有容量,每个插入操作对应一个删除操作,即提交的任务总是会交给线程执行,如果没有空闲进程,则创建新线程,数量达最大则执行拒绝策略,一般需要设置很大的maximumPoolSize

2.有界任务队列:ArrayBlockingQueue,有新任务时,若线程池的实际线程数小于corePoolSize,优先创建新线程,若大于corePoolSize,加入到等待队列,若队列已满,不大于maximumPoolSize前提下,创建新线程执行;当且仅当等待队列满时才会创建新线程,否则数量一直维持在corePoolSize

3.无界任务队列:LinkedBlockingQueue,小于corePoolSize时创建线程,达到corePoolSize则加入队列直到资源消耗殆尽

4.优先任务队列:PriorityBlockingQueue,特殊无界队列,总是保证高优先级的任务先执行.

  • Executors分析

newFixedThreadPool: corePoolSize=maximumPoolSize,线程不会超过corePoolSize,使用LinkedBlockingQueue

newSingleThreadPoolExecutor: newFixedThreadPool的弱化版,corePoolSize只有1

newCachedThreadPool: corePoolSize=0,maximumPoolSize为无穷大,空闲线程60秒回收,使用SynchronousQueue队列

  • ThreadPoolExecutor的execute()方法执行逻辑
 public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException();
 /*
 * Proceed in 3 steps:
 *
 * 1. If fewer than corePoolSize threads are running, try to
 * start a new thread with the given command as its first
 * task. The call to addWorker atomically checks runState and
 * workerCount, and so prevents false alarms that would add
 * threads when it shouldn't, by returning false.
 *
 * 2. If a task can be successfully queued, then we still need
 * to double-check whether we should have added a thread
 * (because existing ones died since last checking) or that
 * the pool shut down since entry into this method. So we
 * recheck state and if necessary roll back the enqueuing if
 * stopped, or start a new thread if there are none.
 *
 * 3. If we cannot queue task, then we try to add a new
 * thread. If it fails, we know we are shut down or saturated
 * and so reject the task.
 */
 int c = ctl.get();
 if (workerCountOf(c) < corePoolSize) { //检查是否小于corePoolSize
 if (addWorker(command, true)) //添加线程,执行任务
 return;
 c = ctl.get();
 }
 if (isRunning(c) && workQueue.offer(command)) { //添加进队列
 int recheck = ctl.get();
 if (! isRunning(recheck) && remove(command)) //双重校验
 reject(command);
 else if (workerCountOf(recheck) == 0)
 addWorker(null, false);
 }
 else if (!addWorker(command, false)) //提交线程池失败
 reject(command); //拒绝执行
 }
 

  • 拒绝策略

AbortPolicy :该策略会直接抛出异常,阻止系统正常工作。

CallerRunsPolicy :只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可 能会急剧下降。 任务,并尝试再次提交当前任务。

DiscardOldestPolicy :该策略将丢弃最老的一个请求,也就是即将被执行的一个

DiscardPolicy :该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!

  • 自定义ThreadFactory
 public static void main(String[] args) throws InterruptedException {
 MyTask task = new MyTask();
 ExecutorService es = new ThreadPoolExecutor(5, 5,
 0L, TimeUnit.MILLISECONDS,
 new SynchronousQueue<Runnable>(),
 new ThreadFactory(){
 @Override
 public Thread newThread(Runnable r) { //自定义创建线程的方法
 Thread t= new Thread(r);
 t.setDaemon(true);
 System.out.println("create "+t);
 return t;
 }
 }
 );
 for (int i = 0; i < 5; i++) {
 es.submit(task);
 }
 Thread.sleep(2000);
 }
 
  • 扩展线程池
public class ExtThreadPool {
 public static class MyTask implements Runnable {
 public String name;
 public MyTask(String name) {
 this.name = name;
 }
 @Override
 public void run() {
 System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId()
 + ",Task Name=" + name);
 try {
 Thread.sleep(100);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
 public static void main(String[] args) throws InterruptedException {
 ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>()) {
 @Override
 protected void beforeExecute(Thread t, Runnable r) {
 System.out.println("准备执行:" + ((MyTask) r).name);
 }
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
 System.out.println("执行完成:" + ((MyTask) r).name);
 }
 @Override
 protected void terminated() {
 System.out.println("线程池退出");
 }
 };
 for (int i = 0; i < 5; i++) {
 MyTask task = new MyTask("TASK-GEYM-" + i);
 es.execute(task);
 Thread.sleep(10);
 }
 es.shutdown(); //等待所有任务执行完毕后再关闭
 }
}
 
  • 异常堆栈消息

线程池中的异常堆栈可能不会抛出,需要我们自己去包装

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
 public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
 }
 @Override
 public void execute(Runnable task) {
 super.execute(wrap(task, clientTrace(), Thread.currentThread()
 .getName()));
 }
 @Override
 public Future<?> submit(Runnable task) {
 return super.submit(wrap(task, clientTrace(), Thread.currentThread()
 .getName()));
 }
 private Exception clientTrace() {
 return new Exception("Client stack trace");
 }
 private Runnable wrap(final Runnable task, final Exception clientStack,
 String clientThreadName) {
 return new Runnable() {
 @Override
 public void run() {
 try {
 task.run(); //外层包裹trycatch,即可打印出异常
 } catch (Exception e) {
 clientStack.printStackTrace();
 throw e;
 }
 }
 };
 }
}
 
  • Fork/Join框架

类似于mapreduce,用于大数据量,fork()创造子线程,join表示等待,

public class CountTask extends RecursiveTask<Long>{
 private static final int THRESHOLD = 10000; //任务分解规模
 private long start;
 private long end;
 public CountTask(long start,long end){
 this.start=start;
 this.end=end;
 }
 @Override
 public Long compute(){
 long sum=0;
 boolean canCompute = (end-start)<THRESHOLD; //表示计算极限10000,超出此值需要使用forkjoin
 if(canCompute){
 for(long i=start;i<=end;i++){
 sum +=i;
 }
 }else{
 //分成100个小任务
 long step=(start+end)/100;
 ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
 long pos=start;
 for(int i=0;i<100;i++){
 long lastOne=pos+step;
 if(lastOne>end)lastOne=end; //最后一个任务可能小于step,故需要此步
 CountTask subTask=new CountTask(pos,lastOne); //子任务
 pos+=step+1; //调整下一个任务
 subTasks.add(subTask);
 subTask.fork(); //fork子任务
 }
 for(CountTask t:subTasks){
 sum+=t.join(); //聚合任务
 }
 }
 return sum;
 }
 public static void main(String[]args){
 ForkJoinPool forkJoinPool = new ForkJoinPool();
 CountTask task = new CountTask(0,200000000000L);
 ForkJoinTask<Long> result = forkJoinPool.submit(task);
 try{
 long res = result.get();
 System.out.println("sum="+res);
 }catch(InterruptedException e){
 e.printStackTrace();
 }catch(ExecutionException e){
 e.printStackTrace();
 }
 }
}
 

注意:如果任务的划分层次很多,一直得不到返回,可能有两种原因:

1.系统内线程数量越积越多,导致性能严重下降

2.函数调用层次变多,导致栈溢出

  • Guava对线程池的拓展

1.特殊的DirectExecutor线程池

Executor executor=MoreExecutors.directExecutor(); // 仅在当前线程运行,用于抽象
 

2.Daemon线程池

提供将普通线程转换为Daemon线程.很多情况下,我们不希望后台线程池阻止程序的退出

public class MoreExecutorsDemo2 {
 public static void main(String[] args) {
 ThreadPoolExecutor exceutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
 MoreExecutors.getExitingExecutorService(exceutor);
 exceutor.execute(() -> System.out.println("I am running in " + Thread.currentThread().getName()));
 }
}
 

3.future模式扩展

待续….

谢谢关注~会努力更新的哦

文章来源:智云一二三科技

文章标题:实战java高并发程序设计第三章(一)

文章地址:https://www.zhihuclub.com/185788.shtml

关于作者: 智云科技

热门文章

网站地图