java并发编程

任何一个java对象都继承于Object类,在 线程 间实现通信的往往会用到Object的几个方法,如:wait(),wait(long timeout),wait(long timeout, int nanos)与notify(),notifyAll()几个方法
实现等待/通知机制
Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,
而Condition与 Lock 配合完成等待通知机制,前者是java底层级别的,后者是语言级别的,具有更高的可控制性和扩展性 。
Condition :
await() 线程进入等待状态
signal () 唤醒一个等待在condition上的线程
下面是 Condition 基于非公平锁的独占锁实现 代码示例:
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import java .util.Date;
import java.util.concurrent. lock s.Condition;
public class SimplifyReentrantLockDemo {
/* 是基于非公平锁的独占锁实现。
* 在获得同步锁时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;
* 移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。
* 在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。*/
public static void main(String[] args) {
final SimplifyReentrantLock lock = new SimplifyReentrantLock();
//一个AQS中可以有多个条件队列,但是只有一个同步队列。
//ConditionObject是实现条件队列的关键,
// 每个ConditionObject对象都维护一个单独的条件等待对列
//每个ConditionObject对应一个条件队列,它记录该队列的头节点和尾节点。
//条件队列是单向的,而同步队列是双向的,会有前驱指针。
final Condition condition = lock.newCondition();
new Thread(new Runnable() {
public void run() {
lock.lock();
try {
System.out.println("A-----------进入等待!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
/*
condition.await()方法后会使得当前获取lock的线程进入到等待队列,
如果该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock
当前线程调用condition.await()方法后,会使得当前线程释放lock然后加入到等待队列中,
直至被signal/signalAll后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理。
*/
condition.await(); // 释放锁
System.out.println("A-----------接收到B的通知!继续执行!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
} catch (Interrupted Exception e) {
e.printStackTrace();
}
lock.unlock();
}
}).start();
/*
void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程*/
new Thread(new Runnable() {
public void run() {
try {
System.out.println("B-----------模拟3秒后发送通知过!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
System.out.println("B----------发送通知!"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
//signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
condition.signal();
lock.unlock();
}
}).start();
}
}
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class SimplifyReentrantLock implements Lock {
//AQS子类的对象 互斥锁用它来工作
private final Sync sync = new Sync();
/**
* AQS的子类Sync
* AQS全称为AbstractQueuedSynchronizer
* AQS是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。
*
* 一个重要的却别就是没有ReentrantLock中的NonfairSync和FairSync
*/
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively() {
//是否处于占用状态
return get State () == 1;
}
@Override
protected boolean tryAcquire(int arg) {
//当状态为0是获取锁
//这里做一下判断,如果state的值为等于0,立马将state设置为1
//通过cas操作来修改state状态,表示争抢锁的操作
int state = this.getState();
/*
* compareAndSetState(0, 1)
*
这段代码其实逻辑很简单,就是通过cas 乐观锁 的方式来做比较并替换。
上面这段代码的意思是,如果当前内存中的state的值和预期值expect相等,则替换为update。
更新成功返回true,否则返回false.
这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,*/
if(state == 0) {
if (compareAndSetState(0, arg)) {
//若直接修改成功了,则将占用锁的线程设置为当前线程
//exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用同步状态的线程。
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}else { //如果是第二个线程进来
Thread currentThread = Thread.currentThread();//当前进来的线程
Thread ownerThread = this.getExclusiveOwnerThread();//已经保存进去的独占式线程
if(currentThread == ownerThread) { //判断一下进来的线程和保存进去的线程是同一线程么?如果是,则获取锁成功,如果不是则获取锁失败
//state属性的总结:
//1. 因为存在 多线程 竞争的情形,使用CAS设置值
//2. state初始值为0,线程加一次锁,state加1,获得锁的线程再次加锁,state值再次加1。所以state表示已获得锁线程进行lock操作的次数
//3. 是 volatile 修饰的变量,线程直接从主存中读
this.setState(state+arg); //设置state状态
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//锁的获取和锁的释放是一一对应的,获取过多少次锁就释放多少次锁
if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
//如果释放锁的不是当前线程,则抛出异常
throw new RuntimeException();
}
//释放锁,将状态设置为0
if (getState() == 0) {
//如果state状态为0,则说明锁还没有被占用
throw new IllegalMonitorStateException();
}
int state = this.getState()-arg;
//接下来判断state是否已经归零,只有state归零的时候才真正的释放锁
if(state == 0) {
//state已经归零,做扫尾工作
this.setState(0);
this.setExclusiveOwnerThread(null);
return true;
}
this.setState(state);
return false;
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
// if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException { }
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
}
