您的位置 首页 java

Java并发编程的艺术09-并发栈与消除现象

本章讲述并发栈,消除现象,无锁交换机,后退消除栈

栈(Stack) 是一种后进先出(LIFO)的数据结构。通常有 push() 和 pop() 函数。最后入栈的元素放在栈顶位置,并且最先出栈。本章内容我们将讨论栈并发。初看起来,栈似乎不可能支持并发(同一时刻栈顶只能被 push 或 pop),因为 push() , pop() 调用似乎都需要在栈顶进行同步。而不像队列是在入队和出队的两边分别进行同步。

为了简单起见我们在本章的例子中规定不能往栈中存放 null 元素。

无锁的无界栈

无界指的是对栈的大小没有限制,可以无限增长。

试图从空栈中删除一个数据项的 pop() 调用会抛出异常。push() 首先创建一个新节点,然后调用 tryPush() 来尝试将节点压入栈顶。如果 tryPush() 成功,则push()调用将返回,否则,在后退一段时间以后重新进行 tryPush() 尝试。pop() 调用 tryPop() , tryPop() 使用 compareAndSet() 尝试将栈顶节点出栈。如果成功就返回出栈的节点,否则返回 null。pop() 会在调用 tryPop() 失败后,后退一段时间然后再次尝试。使用后退机制是为了减少在栈顶 top 域上的争用。

该实现是无锁的,因为仅当有无限多次成功调用修改了栈顶时,线程才不能完成 push() 或 pop() 调用。push() 和 pop() 函数的线性化点则是成功的 compareAndSet() 调用,或对空的栈调用 pop() 抛出异常的时刻。

 public class LockFreeStack<T> {
    AtomicReference<Node> top = new AtomicReference<Node>(null);
    static final int MIN_DELAY = 1024;
    static final int MAX_DELAY = 256 * MIN_DELAY;
    Backoff backoff = new Backoff(MIN_DELAY , MAX_DELAY);
    
    protected boolean tryPush(Node node) {
        Node oldTop = top.get();
        node.next = oldTop;
       return top.compareAndSet(oldTop , node);
    }
    
    public void push(T value) {
        Node node = new Node(value);
        while (true) {
            if (tryPush(node)) {
                return;
            } else {
                backoff.backoff();
            }
        }
    }
    
    protected Node tryPop() {
        Node oldTop = top.get();
        if (oldTop == null) {
            throw new EmptyException();
        }
        Node newTop = oldTop.next;
        return top.compareAndSet(oldTop , newTop) ? oldTop : null;
    }
    
    public T pop() throws EmptyException {
        while (true) {
            Node node = tryPop();
            if (node != null) {
                return node.value;
            } else {
                backoff.backoff();
            }
        }
    }
    
    protected class Node {
        public T value;
        public Node next;
        public Node(T value) {
            this.value = value;
            this.next = null;
        }
    }
}  

消除现象

LockFreeStack 实现的可扩展性非常差,并不仅仅因为 top 是一个争用源,而主要因为这种栈是一个顺序瓶颈:函数调用只能一个接一个的前进,按照对 top 域的 compareAndSet() 成功调用次序来排序。

虽然指数后退可以有效的减少栈顶的争用,但并不能从根本上减轻顺序瓶颈的压力。为了让栈能够并行,我们利用栈的这样一种现象:如果一个 push() 后面紧跟了一个 pop() ,则这两个操作可以互相抵消,而栈的状态却不发生改变。这就好像这两个操作从来没有发生过一样对于栈来说。(这是一个有趣的现象。)如果能够利用这种现象,采用某种办法让使得并发的 push , pop 可以相互抵消,则正在调用 push() 的线程就可以在不改变栈本身的情况下与正在调用 pop() 的线程交换数据。称这样的两个调用互相消除。

无锁交换机

所谓的交换机,就是仅允许两个线程会合并交换对象。如果线程A用参数a调用了交换机,而线程B用参数b调用了同一个交换机,则线程A的调用会返回b值,线程B的调用会返回a值。交换机让第一个到达的线程写入自己值,然后自旋等待直到第二个线程到来。随后,第二个线程检测到第一个线程在等待,于是读取第一个线程写入的值,并向交换机发出信号。现在两个线程都读取了对方的值,然后返回。如果第二个线程没有出现,第一个线程调用则可能会超时,从而使得若在一个合理的时间内无法完成交换,线程就离开交换机继续前进。

交换机有一个 AtomicStampReference 的 slot 域,交换机有三种可能的状态 EMPTY, WAITING , BUSY 。slot 的印记记录了交换机的状态。交换机的主循环在超时前一直执行,直到timeout异常结束。一个线程读取 slot 的状态,并按照如下流程处理:

EMPTY :则该线程尝试将数据放入 slot 中,并调用 compareAndSet() 将状态设为 WAITING 。如果失败则说明其他线程成功了,该线程重试。如果成功,则数据被放入到 slot 中并且状态为 WAITING,所以该线程会自旋等待另外一个线程完成交换。如果另外一个线程出现,它将取出 slot 中的数据,并且用自己的数据替换它,把交换机状态设置为 BUSY,通知等待的线程交换已经完成。等待的线程将获取到该数据并把状态重新设置为 EMPTY 。如果没有其他线程出现,等待线程需要重新将 slot 状态设置为 EMPTY 。这个状态改变需要使用 compareAndSet() 因为其他线程有可能试图把 WAITING 状态变为 BUSY。如果这个调用成功会产生一个超时异常。如果失败,则说明某个线程正在进行交换,所以该等待的线程完成了交换。

WAITING :某个线程正在等待且 slot 中包含它的数据。当前线程会取出 slot 中的数据,并试图通过 compareAndSet() 将状态从 WAITING 变为 BUSY 来用它自己的数据来替换 slot 中的数据。如果有另外一个线程成功,或按照一个超时重置状态为 EMPTY ,则该调用就会失败。这时该线程会重试,如果它成功的将状态变为 BUSY ,则返回 slot 中原来的数据。

BUSY :说明此时有两个线程正在使用 slot 进行数据交换,线程会重试直到超时或成功完成交换。

 public class LockFreeExchanger<T> {
    static final int EMPTY = ... , WAITING = ... , BUSY = ...;
    AtomicStampReference<T> slot = new AtomicStampReference(null , 0);
    
    public T exchange(T myItem , long timeout ,TimeUnit unit) throws TimeoutException {
        long nanos = unit.toNanos(timeout);
        long timeBound = System.nanoTime() + nanos;
        int[] stampHolder = {EMPTY};
        while (true) {
            if (System.nanoTime() > timeBound) {
                throw new TimeoutException();
            }
            T yrItem = slot.get(stampHolder);
            int stamp = stampHolder[0];
            switch (stamp) {
                case EMPTY: {
                    if (slot.compareAndSet(yrItem , myItem , EMPTY , WAITING)) {
                        while (System.nanoTime() < timeBound) {
                            yrItem = slot.get(stampHolder);
                            if (stampHolder[0] == BUSY) {
                                slot.set(null , EMPTY);
                                return yrItem;
                            }
                        }
                        
                        if (slot.compareAndSet(myItem , null , WAITING , EMPTY)) {
                            throw new TimeoutException();
                        } else {
                            yrItem = slot.get(stampHolder);
                            slot.set(null , EMPTY);
                            return yrItem;
                        }
                    }
                    break;
                }
                case WAITING: {
                    if (slot.compareAndSet(yrItem , myItem , WAITING , BUSY)) {
                        return yrItem;
                    }
                    break;
                }
                case BUSY: break;
                default: break;
            }
        }
    }
}  

消除数组

EliminationArray 是作为一个最大容量为 capacity 的数组来实现的,数组中的数据类型为 LockFreeExchanger 。准备进行一次交换的线程从数组中随机选择一个对象,并调用该对象的 exchange() ,保证将自己的输入作为与另一个对象交换的值。visit() 会返回它的交换伙伴线程的输入值,或者是抛出一个超时异常。

 public class EliminationArray {
    private static final int duration = ... ;
    LockFreeExchanger<T>[] exchanger;
    Random random = new Random();
    
    public EliminationArray(int capacity) {
        exchanger = (LockFreeExchanger<T>[]) new LockFreeExchanger[capacity];
        for (int i = 0;i < capacity; i++) {
            exchanger = new LockFreeExchanger<T>();
        }
    }
    
    public T visit(T value , int range) throws TimeoutException {
        int slot = random.nextInt(range);
        return (exchanger[slot].exchange(value , duration , TimeUnit.MILLISECONDS));
    }
}   

后退消除栈

如下图所示,线程通过EliminationArray来消除其他的线程,线程随机的来选取数组项来尝试发现互补的调用。

互补的push和pop调用对相互交换数值并返回。如果一个线程的调用不能消除,或者找不到一个配对调用,或者是所找到的配对类型不匹配(比如一个push遇到一个push),这种线程或者重新在一个新单元上再次尝试,或者访问共享的LockFreeStack。这种由数组和共享栈组成的数据结构是可线性化的,因为共享栈是可线性化的,并且可以排序被消除的调用,就好像它们发生在交换数值的时间点。可以将EliminationArray作为一种在共享LockFreeStack上的后退模式。每个线程首先访问LockFreeStack,如果它们调用失败了,该线程尝试使用数组而不是采用简单的后退来消除它的调用。如果没有能够消除它自己,则该线程再次访问LockFreeStack。这种结构被称为 EliminationBackoffStack。

我们先来讲个故事,这个故事讲了两个朋友在选举日讨论政治问题,每个人都想劝说对方改变立场,但都不能成功。最后,其中一个对另一个说:“瞧,既然我们在所有的政治问题上观点都不同,那么我们的选票自然也就互相抵消了,为什么我们不节省我们两个的时间,今天都不去投票呢?”另一个人高兴的同意了,于是两人分开了。不久第一个人的朋友听到这个谈话后对他说:“你这个建议很公平。” ”并不一定,“后者说:”这是我们今天第三次这样做了。“

我们构造中所采用的原理与这个故事是一样的。我们希望允许包含入栈和出栈操作的线程协商并抵消,但必须避免一个线程可以和多个线程达成约定的情形。

 public class EliminationBackoffStack<T> extends LockFreeStack<T> {
    static final int capacity = ... ;
    EliminationArray eliminationArray = new EliminationArray(capacity);
    static ThreadLocal<RangePolicy> policy = new ThreadLocal<RangePolicy>() {
        protected synchronized RangePolicy initialValue() {
            return new RangePolicy();
        } 
    };
    
    
    public void push(T value) {
        RangePolicy rangePolicy = policy.get();
        Node node = new Node(value);
        while (true) {
            if (tryPush(node)) {
                return;
            } else {
                try {
                    T otherValue = eliminationArray.visit(value , rangePolicy.getRange());
                    if (otherValue == null) {
                        rangePolicy.recordEliminationSuccess();
                        return;
                    }
                } catch(TimeoutException e) {
                    rangePolicy.recordEliminationTimeout();
                }
            }
        }
    }
    
    public T pop() throws EmptyException {
        while (true) {
            Node node = tryPop();
            if (node != null) {
                return node.value;
            } else {
                try {
                    T otherValue = eliminationArray.visit(null , rangePolicy.getRange());
                    if (otherValue != null) {
                        rangePolicy.recordEliminationSuccess();
                        return otherValue;
                    }
                } catch(TimeoutException e) {
                    rangePolicy.recordEliminationTimeout();
                }
            }
        }
    }
}  

EliminationBackoffStack 是 LockFreeStack 的子类,覆盖了 push() , pop() ,并增加了一个 EliminationArray 域。当 tryPush() 或 tryPop() 调用失败时,不再进行简单的后退,而是尝试使用 EliminationArray 来进行交换。如果交换成功,则正在进行 push 的线程会检查得到的值是否为 null 来确认是否是被一个 pop 线程交换了值。(因为 pop 总是把 null 穿给交换机。)当 pop 线程进行值交换的时候,如果交换成功它会检查得到的值是否不为 null ,来确认是否是于一个 push 线程进行了交换。

交换也有可能不成功,或许是因为交换没有发生,或者交换发生在同一种类型的线程之间(比如一个 pop 线程和另一个 pop 线程。)为了简单起见,采用一种简单的方式来处理这个问题,就是重新尝试 tryPush() 或 tryPop() 。

一个重要的参数就是 EliminationArray 的范围选取,从这个范围中线程可以选择一个交换机对象。一个小的范围将使得当线程个数很少时,冲突成功的机会较大;而一个大的范围则会降低在一个 BUSY 的交换机上线程等待的可能性(一个交换机一次只能处理一个交换)。如果访问数组的线程很少,则应该选择较小的范围;而当线程数增加时,范围也应增大。可以通过一个 RangePolicy 对象来动态的控制范围,该对象记录了成功交换的次数和超时失败的次数。之所以忽略了由于类型不匹配造成的交换失败(比如 push 线程和 push 线程交换),是因为对于任何给定的 push() 和 pop() 调用的分布,这种情况所占的比例是固定的。一种简单的策略就是随着失败的次数增加而减小范围,反之亦然。还有很多其他策略。例如,可以设计一种更精巧的范围选取策略,在交换机上动态的改变延迟,在访问共享栈前增加后退延迟,动态的控制是否访问共享栈或数组。

EliminationBackoffStack 是一个可以被线性化的栈:任何通过访问LockFreeStack成功返回的 push() 和 pop() 都可以在访问 LockFreeStack 时被线性化。每一对被消除的 push() 和 pop() 可以在它们冲突时线性化。通过消除来完成的调用不会影响到在 LockFreeStack 中完成的可线性化性,因为它们可能已经在 LockFreeStack 的任意一个状态生效,且假如已经生效,LockFreeStack 的状态并没有改变。

因为 EliminationArray 是一种有效的后退模式,所以期望在低负载的情况下它的性能与 LockFreeStack 差不多。于 LockFreeStack 不同的是,EliminationArray 具有扩展性。当负载增加时,成功消除的个数会增大,从而允许很多操作并行执行。由于被消除的操作不会访问栈,所以在栈上的争用也少了。

——The End——

文章来源:智云一二三科技

文章标题:Java并发编程的艺术09-并发栈与消除现象

文章地址:https://www.zhihuclub.com/195357.shtml

关于作者: 智云科技

热门文章

网站地图