goroutine、channel原理

goroutine原理

概念介绍

  1. 并发

⼀个CPU上能同时执⾏多项任务,在很短时间内,CPU来回切换任务执⾏(在某段很短时间内执⾏程序a,然后⼜迅速得切换到程序b去执⾏),有时间上的重叠(宏观上是同时的,微观仍是顺序执⾏),这样看起来多个任务像是同时执⾏,这就是并发。

  1. 并⾏

当系统有多个CPU时,每个CPU同⼀时刻都运⾏任务,互不抢占⾃⼰所在的CPU资源,同时进⾏,称为并⾏。

  1. 进程

CPU在切换程序的时候,如果不保存上⼀个程序的状态(context–上下⽂),直接切换下⼀个程序,就会丢失上⼀个程序的⼀系列状态,于是引⼊了进程这个概念,⽤以划分好程序运⾏时所需要的资源。因此进程就是⼀个程序运⾏时候的所需要的基本资源单位(也可以说是程序运⾏的⼀个实体)。

  1. 线程

CPU切换多个进程的时候,会花费不少的时间,因为切换进程需要切换到内核态,⽽每次调度需要内核态都需要读取⽤户态的数据,进程⼀旦多起来,CPU调度会消耗⼀⼤堆资源,因此引⼊了线程的概念,线程本身⼏乎不占有资源,他们共享进程⾥的资源,内核调度起来不会那么像进程切换那么耗费资源。

  1. 协程

协程拥有⾃⼰的寄存器上下⽂和栈。协程调度切换时,将寄存器上下⽂和栈保存到其他地⽅,在切回来的时候,恢复先前保存的寄存器上下⽂和栈。因此,协程能保留上⼀次调⽤时的状态(即所有局部状态的⼀个特定组合),每次过程重⼊时,就相当于进⼊上⼀次调⽤的状态,换种说法:进⼊上⼀次离开时所处逻辑流的位置。线程和进程的操作是由程序触发系统接⼝,最后的执⾏者是系统;协程的操作执⾏者则是⽤户⾃身程序,goroutine也是协程。

Go并发模型

Go语⾔的并发处理参考了CSP(Communicating Sequential Process)模型。

CSP并发模型是在1970年左右提出的概念,属于⽐较新的概念,不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的⽅式来共享内存”。

Go的CSP模型实现与原始的CSP实现有点差别:原始的CSP中channel⾥的任务都是⽴即执⾏的,⽽go语⾔为其增加了⼀个缓存,即任务可以先暂存起来,等待执⾏线程准备好再顺序执⾏。

Go的CSP并发模型,是通过goroutine和channel来实现的。

  • goroutine 是Go语⾔中并发的执⾏单位。有点抽象,其实就是和传统概念上的”线程“类似,可以理解为”线程“。
  • channel是Go语⾔中各个并发结构体(goroutine)之前的通信机制。通俗的讲,就是各个goroutine之间通信的”管道“,有点类似于Linux中的管道。

⽣成⼀个goroutine的⽅式⾮常的简单:Go⼀下,就⽣成了。

go f()

通信机制channel也很⽅便,传数据⽤channel <- data,取数据⽤<-channel。
在通信过程中,传数据channel <- data和取数据<-channel必然会成对出现,因为这边传,那边
取,两个goroutine之间才会实现通信。
⽽且不管传还是取,必阻塞,直到另外的goroutine传或者取为⽌。

Go调度器GMP

Go语⾔运⾏时环境提供了⾮常强⼤的管理goroutine和系统内核线程的调度器, 内部提供了三种对象:Goroutine,Machine,Processor

Goroutine : 指应⽤创建的goroutine

Machine : 指系统内核线程。

Processor : 指承载多个goroutine的运⾏器

在宏观上说,Goroutine与Machine因为Processor的存在,形成了多对多(M:N)的关系。M个⽤户线程对应N个系统线程,缺点增加了调度器的实现难度

Goroutine是Go语⾔中并发的执⾏单位。Goroutine底层是使⽤协程(coroutine)实现,coroutine是⼀种运⾏在⽤户态的⽤户线程(参考操作系统原理:内核态,⽤户态)它可以由语⾔和框架层调度。Go在语⾔层⾯实现了调度器,同时对⽹络,IO库进⾏了封装处理,屏蔽了操作系统层⾯的复杂的细节,在语⾔层⾯提供统⼀的关键字⽀持。

三者与内核级线程的关系如下所示:

1.jpg

⼀个Machine会对应⼀个内核线程(K),同时会有⼀个Processor与它绑定。⼀个Processor连接⼀个或者多个Goroutine。Processor有⼀个运⾏时的Goroutine(上图中绿⾊的G),其它的Goroutine处于等待状态。

Processor的数量同时可以并发任务的数量,可通过GOMAXPROCS限制同时执⾏⽤户级任务的操作系统线程。GOMAXPROCS值默认是CPU的可⽤核⼼数,但是其数量是可以指定的。在go语⾔运⾏时环境,可以使⽤

runtime.GOMAXPROCS(MaxProcs)

来指定Processor数量。

默认数量为

func schedinit() {
 //设置最⼤的M数量
 sched.maxmcount = 10000
}
  • 当⼀个Goroutine创建被创建时,Goroutine对象被压⼊Processor的本地队列或者Go运⾏时全局Goroutine队列。
  • Processor唤醒⼀个Machine,如果Machine的waiting队列没有等待被 唤醒的Machine,则创建⼀个(只要不超过Machine的最⼤值,10000),Processor获取到Machine后,与此Machine绑定,并执⾏此Goroutine。
  • Machine执⾏过程中,随时会发⽣上下⽂切换。当发⽣上下⽂切换时,需要对执⾏现场进⾏保护,以便下次被调度执⾏时进⾏现场恢复。Go调度器中Machine的栈保存在Goroutine对象上,只需要将Machine所需要的寄存器(堆栈指针、程序计数器等)保存到Goroutine对象上即可。
  • 如果此时Goroutine任务还没有执⾏完,Machine可以将Goroutine重新压⼊Processor的队列,等待下⼀次被调度执⾏。
  • 如果执⾏过程遇到阻塞并阻塞超时,Machine会与Processor分离,并等待阻塞结束。此时Processor可以继续唤醒Machine执⾏其它的Goroutine,当阻塞结束时,Machine会尝试”偷取”⼀个Processor,如果失败,这个Goroutine会被加⼊到全局队列中,然后Machine将⾃⼰转⼊Waiting队列,等待被再次唤醒。

channel原理

channel数据结构

channel⼀个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语⾔层⾯提供的goroutine间的通信⽅式。

Go依赖于称为CSP(Communicating Sequential Processes)的并发模型,通过
Channel实现这种同步模式。

通过channel来实现通信:

package main
import (
 "fmt"
 "time"
)
func goRoutineA(a <-chan int) {
 val := <-a
 fmt.Println("goRoutineA:", val) }
func goRoutineB(b chan int) {
 val := <-b
 fmt.Println("goRoutineB:", val) }
 func main() {
 ch := make(chan int, 3)
 go goRoutineA(ch)
 go goRoutineB(ch)
 ch <- 3
 time.Sleep(time.Second) 
 }

channel结构体:

//path:src/runtime/chan.go
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列⻓度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的⼤⼩
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写⼊时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}

channel实现⽅式

chan内部实现了⼀个环形队列作为其缓冲区,队列的⻓度是创建chan时指定的。

下面展示了⼀个可缓存6个元素的channel示意图:

2.png
  • dataqsiz指示了队列⻓度为6,即可缓存6个元素
  • buf指向队列的内存,队列中还剩余两个元素
  • qcount表示队列中还有两个元素
  • sendx指示后续写⼊的数据存储的位置,取值[0, 6]
  • recvx指示从该位置读取数据, 取值[0, 6]

等待队列

从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。 向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

被阻塞的goroutine将会挂在channel的等待队列中:

  • 因读阻塞的goroutine会被向channel写⼊数据的goroutine唤醒;
  • 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;

以下展示了⼀个没有缓冲区的channel,有⼏个goroutine阻塞等待读数据:

3.png

注意,⼀般情况下recvq和sendq⾄少有⼀个为空。只有⼀个例外,那就是同⼀个goroutine使⽤select语句向channel⼀边写数据,⼀边读数据。

channel读写

创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区⻓度由make语句传⼊,buf的⼤⼩则与元素⼤⼩和缓冲区⻓度共同决定。

func makechan(t *chantype, size int) *hchan {
 elem := t.elem
 // compiler checks this but be safe.
 if elem.size >= 1<<16 {
 throw("makechan: invalid channel element type")
 }
 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
 throw("makechan: bad alignment")
 }
 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
 if overflow || mem > maxAlloc-hchanSize || size < 0 {
 panic(plainError("makechan: size out of range"))
 }
 // Hchan does not contain pointers interesting for GC when elements
stored in buf do not contain pointers.
 // buf points into the same allocation, elemtype is persistent.
 // SudoG's are referenced from their owning thread so they can't be
collected.
 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
 var c *hchan
 switch {
 case mem == 0:
 // Queue or element size is zero.
 c = (*hchan)(mallocgc(hchanSize, nil, true))
 // Race detector uses this location for synchronization.
 c.buf = c.raceaddr()
 case elem.ptrdata == 0:
 // Elements do not contain pointers.
 // Allocate hchan and buf in one call.
 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
 c.buf = add(unsafe.Pointer(c), hchanSize)
 default:
 // Elements contain pointers.
 c = new(hchan)
 c.buf = mallocgc(mem, elem, true)
 }
 c.elemsize = uint16(elem.size)
 c.elemtype = elem
 c.dataqsiz = uint(size)
 if debugChan {
 print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=",
elem.alg, "; dataqsiz=", size, "\n")
 }
 return c
 }
向channel写数据

向⼀个channel中写数据简单过程如下:

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq
    取出G,并把数据写⼊,最后把该G唤醒,结束发送过程;
  2. 如果缓冲区中有空余位置,将数据写⼊缓冲区,结束发送过程;
  3. 如果缓冲区中没有空余位置,将待发送数据写⼊G,将当前G加⼊sendq,进⼊睡眠,等待被读goroutine唤醒;
4.png

从channel读数据

从⼀个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,
    最后把G唤醒,结束读取过程;
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中⾸部读出数据,把G中
    数据写⼊缓冲区尾部,把G唤醒,结束读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  4. 将当前goroutine加⼊recvq,进⼊睡眠,等待被写goroutine唤醒;
5.png

关闭channel

关闭channel时会把recvq中的G全部唤醒,本该写⼊G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。

func closechan(c *hchan) {
 if c == nil {
 panic(plainError("close of nil channel"))
 }
 lock(&c.lock)
 if c.closed != 0 {
 unlock(&c.lock)
 panic(plainError("close of closed channel"))
 }
 if raceenabled {
 callerpc := getcallerpc()
 racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
 racerelease(c.raceaddr())
 }
 c.closed = 1
 var glist gList
 // 释放所有接收者
 for {
 sg := c.recvq.dequeue()
 if sg == nil {
 break
 }
 if sg.elem != nil {
 typedmemclr(c.elemtype, sg.elem)
 sg.elem = nil
 }
 if sg.releasetime != 0 {
 sg.releasetime = cputicks()
 }
 gp := sg.g
 gp.param = nil
 if raceenabled {
 raceacquireg(gp, c.raceaddr())
 }
 glist.push(gp)
 }
 // 释放所有发送者
 for {
 sg := c.sendq.dequeue()
 if sg == nil {
 break
 }
 sg.elem = nil
 if sg.releasetime != 0 {
 sg.releasetime = cputicks()
 }
 gp := sg.g
 gp.param = nil
 if raceenabled {
 raceacquireg(gp, c.raceaddr())
 }
 glist.push(gp)
 }
 unlock(&c.lock)
 // 垃圾回收
 for !glist.empty() {
 gp := glist.pop()
 gp.schedlink = 0
 goready(gp, 3)
 } }

除此之外,panic出现的常⻅场景还有:

  1. 关闭值为nil的channel
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

发表评论

电子邮件地址不会被公开。 必填项已用*标注