您的位置 首页 golang

goroutine、channel原理


goroutine原理

概念介绍

  1. 并发

⼀个CPU上能同时执⾏多项任务,在很短时间内,CPU来回切换任务执⾏(在某段很短时间内执⾏程序a,然后⼜迅速得切换到程序b去执⾏),有时间上的重叠(宏观上是同时的,微观仍是顺序执⾏),这样看起来多个任务像是同时执⾏,这就是并发。

  1. 并⾏

当系统有多个CPU时,每个CPU同⼀时刻都运⾏任务,互不抢占⾃⼰所在的CPU资源,同时进⾏,称为并⾏。

  1. 进程

CPU在切换程序的时候,如果不保存上⼀个程序的状态(context–上下⽂),直接切换下⼀个程序,就会丢失上⼀个程序的⼀系列状态,于是引⼊了进程这个概念,⽤以划分好程序运⾏时所需要的资源。因此进程就是⼀个程序运⾏时候的所需要的基本资源单位(也可以说是程序运⾏的⼀个实体)。

  1. 线程

CPU切换多个进程的时候,会花费不少的时间,因为切换进程需要切换到内核态,⽽每次调度需要内核态都需要读取⽤户态的数据,进程⼀旦多起来,CPU调度会消耗⼀⼤堆资源,因此引⼊了线程的概念,线程本身⼏乎不占有资源,他们共享进程⾥的资源,内核调度起来不会那么像进程切换那么耗费资源。

  1. 协程

协程拥有⾃⼰的寄存器上下⽂和栈。协程调度切换时,将寄存器上下⽂和栈保存到其他地⽅,在切回来的时候,恢复先前保存的寄存器上下⽂和栈。因此,协程能保留上⼀次调⽤时的状态(即所有局部状态的⼀个特定组合),每次过程重⼊时,就相当于进⼊上⼀次调⽤的状态,换种说法:进⼊上⼀次离开时所处逻辑流的位置。线程和进程的操作是由程序触发系统接⼝,最后的执⾏者是系统;协程的操作执⾏者则是⽤户⾃身程序,goroutine也是协程。

Go并发模型

Go语⾔的并发处理参考了CSP(Communicating Sequential Process)模型。

CSP并发模型是在1970年左右提出的概念,属于⽐较新的概念,不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的⽅式来共享内存”。

Go的CSP模型实现与原始的CSP实现有点差别:原始的CSP中channel⾥的任务都是⽴即执⾏的,⽽go语⾔为其增加了⼀个缓存,即任务可以先暂存起来,等待执⾏线程准备好再顺序执⾏。

Go的CSP并发模型,是通过goroutine和channel来实现的。

  • goroutine 是Go语⾔中并发的执⾏单位。有点抽象,其实就是和传统概念上的”线程“类似,可以理解为”线程“。
  • channel是Go语⾔中各个并发结构体(goroutine)之前的通信机制。通俗的讲,就是各个goroutine之间通信的”管道“,有点类似于Linux中的管道。

⽣成⼀个goroutine的⽅式⾮常的简单:Go⼀下,就⽣成了。

go f()

通信机制channel也很⽅便,传数据⽤channel <- data,取数据⽤<-channel。
在通信过程中,传数据channel <- data和取数据<-channel必然会成对出现,因为这边传,那边
取,两个goroutine之间才会实现通信。
⽽且不管传还是取,必阻塞,直到另外的goroutine传或者取为⽌。

Go调度器GMP

Go语⾔运⾏时环境提供了⾮常强⼤的管理goroutine和系统内核线程的调度器, 内部提供了三种对象:Goroutine,Machine,Processor

Goroutine : 指应⽤创建的goroutine

Machine : 指系统内核线程。

Processor : 指承载多个goroutine的运⾏器

在宏观上说,Goroutine与Machine因为Processor的存在,形成了多对多(M:N)的关系。M个⽤户线程对应N个系统线程,缺点增加了调度器的实现难度

Goroutine是Go语⾔中并发的执⾏单位。Goroutine底层是使⽤协程(coroutine)实现,coroutine是⼀种运⾏在⽤户态的⽤户线程(参考操作系统原理:内核态,⽤户态)它可以由语⾔和框架层调度。Go在语⾔层⾯实现了调度器,同时对⽹络,IO库进⾏了封装处理,屏蔽了操作系统层⾯的复杂的细节,在语⾔层⾯提供统⼀的关键字⽀持。

三者与内核级线程的关系如下所示:

1.jpg

⼀个Machine会对应⼀个内核线程(K),同时会有⼀个Processor与它绑定。⼀个Processor连接⼀个或者多个Goroutine。Processor有⼀个运⾏时的Goroutine(上图中绿⾊的G),其它的Goroutine处于等待状态。

Processor的数量同时可以并发任务的数量,可通过GOMAXPROCS限制同时执⾏⽤户级任务的操作系统线程。GOMAXPROCS值默认是CPU的可⽤核⼼数,但是其数量是可以指定的。在go语⾔运⾏时环境,可以使⽤

runtime.GOMAXPROCS(MaxProcs)

来指定Processor数量。

默认数量为

func schedinit() { //设置最⼤的M数量 sched.maxmcount = 10000}
  • 当⼀个Goroutine创建被创建时,Goroutine对象被压⼊Processor的本地队列或者Go运⾏时全局Goroutine队列。
  • Processor唤醒⼀个Machine,如果Machine的waiting队列没有等待被 唤醒的Machine,则创建⼀个(只要不超过Machine的最⼤值,10000),Processor获取到Machine后,与此Machine绑定,并执⾏此Goroutine。
  • Machine执⾏过程中,随时会发⽣上下⽂切换。当发⽣上下⽂切换时,需要对执⾏现场进⾏保护,以便下次被调度执⾏时进⾏现场恢复。Go调度器中Machine的栈保存在Goroutine对象上,只需要将Machine所需要的寄存器(堆栈指针、程序计数器等)保存到Goroutine对象上即可。
  • 如果此时Goroutine任务还没有执⾏完,Machine可以将Goroutine重新压⼊Processor的队列,等待下⼀次被调度执⾏。
  • 如果执⾏过程遇到阻塞并阻塞超时,Machine会与Processor分离,并等待阻塞结束。此时Processor可以继续唤醒Machine执⾏其它的Goroutine,当阻塞结束时,Machine会尝试”偷取”⼀个Processor,如果失败,这个Goroutine会被加⼊到全局队列中,然后Machine将⾃⼰转⼊Waiting队列,等待被再次唤醒。

channel原理

channel数据结构

channel⼀个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语⾔层⾯提供的goroutine间的通信⽅式。

Go依赖于称为CSP(Communicating Sequential Processes)的并发模型,通过
Channel实现这种同步模式。

通过channel来实现通信:

package mainimport ( "fmt" "time")func goRoutineA(a <-chan int) { val := <-a fmt.Println("goRoutineA:", val) }func goRoutineB(b chan int) { val := <-b fmt.Println("goRoutineB:", val) } func main() { ch := make(chan int, 3) go goRoutineA(ch) go goRoutineB(ch) ch <- 3 time.Sleep(time.Second)  }

channel结构体:

//path:src/runtime/chan.gotype hchan struct {qcount uint // 当前队列中剩余元素个数dataqsiz uint // 环形队列⻓度,即可以存放的元素个数buf unsafe.Pointer // 环形队列指针elemsize uint16 // 每个元素的⼤⼩closed uint32 // 标识关闭状态elemtype *_type // 元素类型sendx uint // 队列下标,指示元素写⼊时存放到队列中的位置recvx uint // 队列下标,指示元素从队列的该位置读出recvq waitq // 等待读消息的goroutine队列sendq waitq // 等待写消息的goroutine队列lock mutex // 互斥锁,chan不允许并发读写}

channel实现⽅式

chan内部实现了⼀个环形队列作为其缓冲区,队列的⻓度是创建chan时指定的。

下面展示了⼀个可缓存6个元素的channel示意图:

2.png
  • dataqsiz指示了队列⻓度为6,即可缓存6个元素
  • buf指向队列的内存,队列中还剩余两个元素
  • qcount表示队列中还有两个元素
  • sendx指示后续写⼊的数据存储的位置,取值[0, 6]
  • recvx指示从该位置读取数据, 取值[0, 6]

等待队列

从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。 向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

被阻塞的goroutine将会挂在channel的等待队列中:

  • 因读阻塞的goroutine会被向channel写⼊数据的goroutine唤醒;
  • 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;

以下展示了⼀个没有缓冲区的channel,有⼏个goroutine阻塞等待读数据:

3.png

注意,⼀般情况下recvq和sendq⾄少有⼀个为空。只有⼀个例外,那就是同⼀个goroutine使⽤select语句向channel⼀边写数据,⼀边读数据。

channel读写

创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区⻓度由make语句传⼊,buf的⼤⼩则与元素⼤⼩和缓冲区⻓度共同决定。

func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elementsstored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't becollected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=",elem.alg, "; dataqsiz=", size, "\n") } return c }
向channel写数据

向⼀个channel中写数据简单过程如下:

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq
    取出G,并把数据写⼊,最后把该G唤醒,结束发送过程;
  2. 如果缓冲区中有空余位置,将数据写⼊缓冲区,结束发送过程;
  3. 如果缓冲区中没有空余位置,将待发送数据写⼊G,将当前G加⼊sendq,进⼊睡眠,等待被读goroutine唤醒;
4.png

从channel读数据

从⼀个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,
    最后把G唤醒,结束读取过程;
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中⾸部读出数据,把G中
    数据写⼊缓冲区尾部,把G唤醒,结束读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  4. 将当前goroutine加⼊recvq,进⼊睡眠,等待被写goroutine唤醒;
5.png

关闭channel

关闭channel时会把recvq中的G全部唤醒,本该写⼊G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。

func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist gList // 释放所有接收者 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // 释放所有发送者 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // 垃圾回收 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }

除此之外,panic出现的常⻅场景还有:

  1. 关闭值为nil的channel
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

文章来源:智云一二三科技

文章标题:goroutine、channel原理

文章地址:https://www.zhihuclub.com/1344.shtml

关于作者: 智云科技

热门文章

网站地图