之前在看 golang 多线程通信的时候, 看到了go 的管道. 当时就觉得这玩意很神奇, 因为之前接触过的不管是 php , java , Python , js , c 等等, 都没有这玩意, 第一次见面, 难免勾起我的好奇心. 所以就想着看一看它具体是什么东西. 很明显, 管道是 go 实现在语言层面的功能, 所以我以为需要去翻他的源码了. 虽然最终没有翻到 C 的层次, 不过还是受益匪浅.



要想知道他是什么东西, 没什么比直接看他的定义更加直接的了. 但是其定义在哪里么? 去哪里找呢? 还记得我们是如何创建 chan 的么? make 方法. 但是当我找过去的时候, 发现 make 方法只是一个函数的声明.

这, 还是没有函数的具体实现啊. 汇编看一下. 编写以下内容:

 package main

func main() {
_ = make( ch an int)


go tool compile -N -l -S main.go

虽然汇编咱看不懂, 但是其中有一行还是引起了我的注意.

golang chan 探究

make 调用了 runtime .makechan . 漂亮, 就找他.

golang chan 探究

找到他了, 是 hchan 指针对象. 整理了一下对象的字段(不过人家自己也有注释的):

 // 其内部维护了一个循环队列(数组), 用于管理发送与接收的缓存数据. 
type hchan struct {
  // 队列中元素个数
qcount   uint
  // 队列的大小(数组长度)
dataqsiz uint
  // 指向底层的缓存队列, 是一个可以指向任意类型的指针. 
buf      unsafe.Pointer
  // 管道每个元素的大小
elemsize uint16
  // 是否被关闭了
closed   uint32
  // 管道的元素类型
elemtype *_type
  // 当前可以发送的元素 索引 (队尾)
sendx    uint  
  // 当前可以接收的元素索引(队首)
recvx    uint  
  // 当前等待接收数据的 goroutine 队列
recvq    waitq
  // 当前等待发送数据的 goroutine 队列
sendq    waitq 
// 锁, 用来保证管道的每个操作都是原子性的. 
 lock  mutex

可以看的出来, 管道简单说就是一个队列加一把锁.


依旧使用刚才的方法分析, 发送数据时调用了 runtime.chansend1 函数. 其实现简单易懂:

golang chan 探究

然后查看真正实现, 函数步骤如下(个人理解, 有一些 test 使用的代码被我删掉了. ):

 func chansend(c *hchan, ep unsafe.Pointer,  block  bool, callerpc uintptr) bool {
  // 异常处理, 若管道指针为空
if c == nil {
if !block {
return false
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
// 常量判断, 恒为 false, 应该是开发时调试用的. 
if debugChan {
print("chansend: chan=", c, "n")
// 常量, 恒为 false, 没看懂这个判断
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
  // 若当前操作不阻塞, 且管道还没有关闭时判断
  // 当前队列容量为0且没有等待接收数据的 或 当前队列容量不为0且队列已满
  // 那么问题来了, 什么时候不加锁呢? select 的时候. 可以在不阻塞的时候快速返回
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
// 上锁, 保证操作的原子性
// 若管道已经关闭, 报错
if c.closed != 0 {
panic(plainError("send on closed  channel "))
// 从接受者队列获取一个接受者, 若存在, 数据直接发送, 不走缓存, 提高效率
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
// 若缓存为满, 则将数据放到缓存中排队
if c.qcount < c.dataqsiz {
    // 取出对尾的地址
qp := chanbuf(c, c.sendx)
    // 将ep 的内容拷贝到 ap 地址
typedmemmove(c.elemtype, qp, ep)
    // 更新队尾索引
if c.sendx == c.dataqsiz {
c.sendx = 0
return true
// 若当前不阻塞, 直接返回
if !block {
return false
// 当走到这里, 说明数据没有成功发送, 且需要阻塞等待. 
  // 以下代码没看懂, 不过可以肯定的是, 其操作为阻塞当前协程, 等待发送数据
 gp  := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
panic(plainError("send on closed channel"))
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
mysg.c = nil
return true

虽然最终阻塞的地方没看太明白, 不过发送数据的大体流程很清楚:

  1. 若无需阻塞且不能发送数据, 返回失败
  2. 若存在接收者, 直接发送数据
  3. 若存在缓存, 将数据放到缓存中
  4. 若无需阻塞, 返回失败
  5. 阻塞等待发送数据

其中不加锁的操作, 在看到 selectnbsend 函数的注释时如下:

 // compiler implements
//select {
//case c <- v:
//... foo
//... bar
// as
//if selectnbsend(c, v) {
//... foo
//} else {
//... bar
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())

看这意思, select 关键字有点类似于语法糖, 其内部会转换成调用 selectnbsend 函数的简单 if 判断.


至于接收数据的方法, 其内部实现与发送大同小异. runtime.chanrecv 方法.

源码简单看了一下, 虽理解不深, 但对 channel 也有了大体的认识.


简单对 channel 的使用总结一下.


 // 创建普通的管道类型, 非缓冲
a := make(chan int)
// 创建缓冲区大小为10的管道
b := make(chan int, 10)
// 创建只用来发送的管道
c := make(chan<- int)
// 创建只用来接收的管道
d := make(<-chan int)
// eg: 只用来接收的管道, 每秒一个
e := time.After(time.Second)


 // 接收数据
a := <- ch
b, ok := <- ch
// 发送数据
ch <- 2

最后, 看了一圈, 感觉 channel 并不是很复杂, 就是一个队列, 一端接受, 一端发送. 不过其对多协程处理做了很多优化. 与协程配合, 灵活使用的话, 应该会有不错的效果.


