浅析context

为什么会有这个包

我们其实知道goroutine是没有父子关系,也没有先后顺序的,所以也就没有了我们常说的子进程退出后的通知机制。那么成百上千的goroutine如何协同工作:通信,同步,退出,通知

1.通信:goroutine 的通知就是依靠chan
2.同步:goroutine如何同步其实我们可以通过无缓冲的channel和sync包下的waitgroup机制来进行同步
3.通知:goroutine间如何通知呢,其实通知不同于通信,通知更多的是管理和控制流数据。这个可以用两个chan来进行管控,一个进行业务流交互,另外一个用作通知做入库之类的操作,但是并不通用,这个管控的难度会随业务复杂度增加而增加
4.退出:这个在我上篇文章其实有单独提出来说的,等待退出机制,借助select 的广播机制实现退出

其实上面看似好像每一个部分都能有解决方案,但是实际拎出来讲一下如果每个goroutine退出都要写一个等待退出,那么go的便捷性是不是完完全全损失掉了。实际编码过程中goroutine没有父子关系,goroutine多开goroutine,最终形成一个树状调用结构,那么这里就有个问题,我在一个goroutine中如何知道另外一个goroutine是否退出呢,这就是大型项目必须要考虑的东西了。

context 起到了什么样的作用

1.退出通知机制,通知可以传递到整个goroutine调用树上的每一个goroutine
2.传递数据,数据可以传递给整个gortouine调用树上的每一个goroutine

基本数据结构

整体工作机制:创建第一个context的goroutine 被称为root节点,root节点负责创建一个context接口的具体对象,并将对象作为参数传递到新的goroutine。下游的goroutine可以继续封装该对象。这样的传递过程就生成了一个树状结构。此时root节点就可以传递消息到下游goroutine,

context 接口

type Context interface {
    //如果context 实现了超时控制,则此方法这返回ok true,deadline为超时时间,否则ok为false
    Deadline() (deadline time.Time, ok bool)

    //后端被调的goroutine应该返回监听的方法返回的chan 以便及时释放资源
    Done() <-chan struct{}

    //Done返回的chan收到的通知的时候,才可以访问此方法为什么被取消0
    Err() error

    //可以访问上游的goroutine 传递给下游的goroutine的值
    Value(key interface{}) interface{}
}

Cancer 接口

type canceler interface {
    //一个context 如果被实现了cancel接口,则可以被取消的
    
    //创建cancel接口实例的goroutine 调用cancel方法通知后续创建goroutine退出
    cancel(removeFromParent bool, err error)
    //Done方法需要chan 返回goroutine来监听,并及时退出
    Done() <-chan struct{}
}

empty Context

我们平常使用的方法如下,其实空的节点最大的特点就是形成root节点,emptyctx所有的方法都是空的,并不具备任何功能。因为context包的使用思路就是不停的调用context包提供的包装函数来创建具有特殊功能的context实例,每一个context都以上一个context为参照对象,最终形成一个树状结构


func main()  {

    c :=context.TODO()
}

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}

func (e *emptyCtx) String() string {
    switch e {
    case background:
        return "context.Background"
    case todo:
        return "context.TODO"
    }
    return "unknown empty Context"
}

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

cancel context

cancelCtx 是一个实现了canceler接口,conceler具有退出通知功能,值得注意的是退出通知并不能通知到自己,但能逐层的通知到children节点。
//cancelCtx可以被取消,cancelCtx取消会同时取消所有实现canceler接口的孩子节点


type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

func (c *cancelCtx) Value(key interface{}) interface{} {
    if key == &cancelCtxKey {
        return c
    }
    return c.Context.Value(key)
}

func (c *cancelCtx) Done() <-chan struct{} {
    c.mu.Lock()
    if c.done == nil {
        c.done = make(chan struct{})
    }
    d := c.done
    c.mu.Unlock()
    return d
}

func (c *cancelCtx) Err() error {
    c.mu.Lock()
    err := c.err
    c.mu.Unlock()
    return err
}

func (c *cancelCtx) String() string {
    return contextName(c.Context) + ".WithCancel"
}

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return // already canceled
    }
    c.err = err
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)
    }
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
        removeChild(c.Context, c)
    }
}

timerCtx

timerCtx是一个实现了Context的接口的具体类型,内部封装了cancelCtx类型实例,同时有一个deadline实例,用来实现定时退出通知

type timerCtx struct {
    *cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
}

func (c *timerCtx) String() string {
    return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
        // Remove this timerCtx from its parent cancelCtx's children.
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()
}

valueCtx

valueCtx 是实现了一个context接口的具体类型,内部封装了Context接口类型,同时封装了一个key,value存储变量,valueCtx可用来传递通知信息

type valueCtx struct {
    Context
    key, val interface{}
}

func (c *valueCtx) String() string {
    return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
        return c.val
    }
    return c.Context.Value(key)
}

API函数

api函数则不做细讲,具体可通过编译器查看其返回参数

1. func Background()context
2.  func Todo()context
3.withCancel(parent Context)(ctx Context,cancel CancelFunc)
4.withDeadline(parent Context)(ctx Context,deadline time.Time)
5.withTimeout(parent Context,timeout time.Duration)(ctx Context,timeout time.Duration)
6.withValue(parent Context,key,value interface{})context

辅助函数

上面的Api函数是给外部创建ctx对象结构的api的话,那么其内部有些通用函数,我们可以来讲一讲

1.func propagateCancel(parent Context, child canceler)

1.判断parentDone方法是否为nil,如果是nil,那么说明parent是一个可取消的Context对象,也就灭有所谓的取消构造树,说明child就是取消构造树的根
2.如果parent方法Done返回不是nil,那么向上回溯自己的祖先是否为cancelCtx的类型实例,如果是,则将child注册到parent树中去
3.如果向上回溯自己的祖先都不是cancelCtx类型实例,说明整个聊条的取消树都不是连续的,此时只需要监听父类的关闭和自己的取消信号即可


// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
    if parent.Done() == nil {
        return 
    }
    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        if p.err != nil {
            // parent has already been canceled
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]bool)
            }
            p.children[child] = true
        }
        p.mu.Unlock()
    } else {
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

2.func parentCancelCtx(parent Context) (*cancelCtx, bool)

判断parent中是否封装cancelCtx,或者接口中存放的底层类型是否是cancelCtx类型

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    for {
        switch c := parent.(type) {
        case *cancelCtx:
            return c, true
        case *timerCtx:
            return c.cancelCtx, true
        case *valueCtx:
            parent = c.Context
        default:
            return nil, false
        }
    }
}

3.func removeChild(parent Context, child canceler)

如果parent 封装的cancelCtx 字段类型,或者接口里面存放的底层类型是cancelCtx类型,则将其构造树上的节点删除

// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
    p, ok := parentCancelCtx(parent)
    if !ok {
        return
    }
    p.mu.Lock()
    if p.children != nil {
        delete(p.children, child)
    }
    p.mu.Unlock()
}

实际应用

1.测试withDeadline

//测试withDeadline
func main(){
    root:=context.Background()

    son,cancel:=context.WithDeadline(root,time.Now().Add(3*time.Second))

    defer cancel()
    go work(son,"woker_son")

    time.Sleep(100*time.Second)

}

func work(ctx context.Context,name string){

    fmt.Println(name)
    label:
    for{
        select {
        case <-ctx.Done():
            //到了我们设置的超时时间就会走到这里来
            fmt.Println("过了超时时间")

            break label
        default:
            time.Sleep(4*time.Second)
            fmt.Println("hahah")
        }
    }
    fmt.Println("跳出循环了")
}

运行结果

woker_son
hahah
过了超时时间
跳出循环了

具体应用场景
其实我们可以利用这个上下文进行一些延时器等应用场景,超时重试等机制可以借助这个来进行而不需要定时器

2.测试withCancel

//测试withDeadline
func main(){
    root:=context.Background()

    son,cancel:=context.WithCancel(root)

    fmt.Println(son)
    go work(son,"woker_son")
    go work(son,"woker_son2")

    cancel()
    time.Sleep(100*time.Second)

}

func work(ctx context.Context,name string){

    label:
    for{
        select {
        case <-ctx.Done():
            //到了我们设置的超时时间就会走到这里来
            fmt.Printf("%v 听到了关闭了通道\n",name)

            break label
        default:
            time.Sleep(410*time.Second)
            fmt.Println("hahah")
        }
    }
    fmt.Println("跳出循环了")
}

结果是:

context.Background.WithCancel
woker_son 听到了关闭了通道
跳出循环了
woker_son2 听到了关闭了通道
跳出循环了

这个应用场景其实相对比较宽泛,timeCtx 和CancerCtx都是做了cancel接口的继承。当父类goroutine退出,就可以通知到其下级。但是具体的多级还是需要各位看客去亲自试一试了,有兴趣可以试一下照着我上篇浅析golang并发的计时器试着用这个写一下,具体应用场景还是挺多的

其他的例子就不多写了,withTimeout实际就是调用的是deadline

一个实际的随机数生成器的例子

func main(){
    root:=context.Background()

    son,cancel:=context.WithCancel(root)

    for i:=0;i<10;i++{
    fmt.Println(<-CreateInt(son))
    }
    cancel()

}

//  简单的随机数生成器
func CreateInt(ctx context.Context)chan int{

    ch:=make(chan int)

    go func() {
        label:
            for{
                select {
                case <-ctx.Done():
                    break label
                case ch<-rand.Int():
                }
            }
            close(ch)
    }()

    return ch
}

如有问题欢迎讨论,创作不易,转载请标明出处


发表评论

电子邮件地址不会被公开。 必填项已用*标注