您的位置 首页 golang

大白话 golang 教程-15-使用协程和并发通道

本章将学习 go 语言和其他语言最大的不同,在语法级别支持并发操作(天生),并发编程的门槛是很高的,通常写高质量的并发代码并不是那么容易,但是 golang 把这个难度降低了,让程序员写出高性能而且易于理解不绕口的并发代码。

系统调度的单位是线程,而线程的上下文切换是非常消耗资源的,协程非常的轻量,并且它对系统来说是透明的,它属于 go 运行时来管理,在普通的函数前加一个 go 关键字,就可以创建一个协程,对,就这么简单,不需要关心线程池、状态等复杂的概念,go 的抢占式调度会帮你安排好一切,由于协程创建的代价太小了,一个线程可能会占用 2M 的内存空间,而一个协程起步只需要 2K(随着栈增长会增加),所以你可以创建几百上千万的协程,而且你根本不需要一个协程池。

不过要掌握好 go 的并发,也需要好好学习它的调度模型(GMP)和用法。下面的代码创建了 2 个协程:

 func main() {
  go func() {} ()
}  

一个是 main,一个是匿名的 func。

 func main() {
  go func() {
    fmt.Println("我来自协程")
  }()

  fmt.Println("wait...")

  // select {}
  time.Sleep(3 * time.Second)
}  

上面的输出结果是:

 wait...
我来自协程

// 顺序不保证,也可能打印
我来自协程
wait...  

使用 time.Sleep 的原因是协程虽然轻量,但是调度和执行也是需要时间的,为了让 main 不要退出的太快让协会程有机会运行(正式场景不会这样用),如果去掉 go 关键字,那么匿名函数会先执行才能打印 wait… 完全是按顺序执行的,如果在循环里那就是挨个循环执行,但如果加上 go 关键字,就创建了 N 个协程并发执行,时间将会大大缩短。修改一下代码:

 func main() {
  for i := 0; i < 10; i++ {
    go func(index int) {
      fmt.Printf("我来自协程 %d\n", index)
      time.Sleep(1 * time.Second)
    }(i)
  }

  fmt.Println("wait...")

  // select {}
  time.Sleep(3 * time.Second)
}  

试试去掉 go 关键字,你发现打印的顺序是一定的,而且时间比加上 go 关键字要长很多,这就是并发的优势,不过这里 time.Sleep 的 3秒钟实在是太丑了,更大的问题是万一创建了 N 个协程,3 秒内还没有完成呢? 改成 5 秒,7 秒? 显然不 OK,因为我们需要对协程做的工作是否完成有个监控,使用 sync.WaitGroup 可以轻松的完成这个工作。修改一下:

 var wg sync.WaitGroup

func main() {
  for i := 0; i < 10; i++ {
    wg.Add(1) // 对协程进行计数

    go func(index int) {
      fmt.Printf("我来自协程 %d\n", index)
      wg.Done() // 通知一下,本协程把问题搞完了
    }(i)
  }

  fmt.Println("wait...")
  wg.Wait() // 等待所有的协程通知我
}  

现在好了,使用全局的 wg 变量类通讯,不但并发高效的完成了工作,而且 main 也能及时退出了。WaitGroup 通过对协程同步计数来完成任务,这些协程之间本身并不认识,如果协程和协程需要通讯怎么办? 比如,一个做饭,一个吃饭,吃饭的总得等着做饭的把饭做好吧。别急,先回到上一章节等待时间的例子:

 func timeAfterTest() {
  deadline := time.After(1 * time.Second)
  fmt.Printf("%T\n", deadline) // <-chan time.Time

  select {
  case <-deadline:
    fmt.Println("到时间了")
  }
}  

发现了一个新的类型: chan,chan 表示通道/信道(就像水管,里面流淌着数据流),chan time.Time 表示通道里的数据类型是 time.Time,<- chan time.Time 表示通道里可以拿出来 time.Time 数据,select 一直等待着 deadline 里留出 time.Time 数据,等到了 case 分支就被执行了,这个机制实现了 main 和时间倒计数协程的通讯,因为 select 本身是阻塞的,deadline 没有通知 case 无法运行,select 就无法结束。

所以,通道就是协程用来通讯的机制,它本身是一个容器,这个容器有大小,可以打开、可以关闭。我们使用通道来模拟吃饭的例子:

 func produce(ready chan bool) {
  fmt.Println("做饭中...")
  time.Sleep(50 * time.Microsecond)
  ready <- true
}

func eatTest() {
  ready := make(chan bool) // 无缓冲
  go produce(ready)        // 去掉 go 死锁
  for {
    select {
    case <-ready:
      fmt.Println("可以吃了")
      return
    default:
      fmt.Println("我先干点别的事")
    }
  }
}  

上面代码的细节:

1. 使用 make 函数来构造 chan 类型的变量,省略第二个参数表示通道没有缓冲,没有缓冲的意思就是发送和接受两方必须同步交接

2. for 是个死循环,当 <- ready 不能交接的时候,执行 default 分支,有交接了 return 结束了整个函数

3. 使用 go produce(ready) 来创建做饭的协程,而且看出 chan 是引用传递,如果去掉 go 关键字 ready,ready<- true 放不进去因为没有同步的接收方,导致死锁

4. chan 类型的变量是同步安全的,由 go 运行时来保证,写代码不需要考虑它被多个协程都同步修改的情况

如果理解有困难,把代码精简一下:

 func eatTest2() {
  ready := make(chan bool) // 无缓冲
  go produce(ready)        // 去掉 go 死锁
  <-ready                  // 同步交接
  fmt.Println("可以吃了")
}  

记住,无缓冲的通道必须同步交接,没有接收方发送方也无法把数据放进通道中,main 中的 <- ready 试图从 ready 通道中拿点数据,但是 produce 函数还没有准备好,main 就阻塞了(如果先是 produce 执行到了也阻塞等带 main),执行到 ready <- true 的时候两边都准备好,同步交接完成,main 和 produce 继续往下执行。

这里 <- ready 并没有去关心通道里的值究竟是多少,如果 produce 函数把通道 close 了(close 函数导致通道广播),<-ready 也能收到信号,但是关闭的通道不能再打开了。

如果要模拟做饭失败的情况修改如下:

 func produce2(ready chan bool) {
  fmt.Println("做饭中...")
  // 做饭成功的与否的算法
  success := false
  if time.Now().Second()%2 == 0 {
    success = true
  }
  ready <- success
}

func eatTest3() {
  ready := make(chan bool) // 无缓冲
  go produce2(ready)       // 去掉 go 死锁
  success := <-ready       // 同步交接
  //success, ok := <-ready // 关闭的通道 ok 为 false
  if success {
    fmt.Println("可以吃了")
  } else {
    fmt.Println("喝西北风")
  }
}  

操作符 <- 看起来点绕,如果 chan 变量在左边 chanVar <- xx 表示写入,在右边 <- chanVar 表示读取,可见 -< 是指数据流向的方向。上面的 chan 通道都是无缓冲的,相对的有缓冲的通道需要给 make 第二个参数。

 ready := make(chan bool)    // 1
ready := make(chan bool, 0) // 2
ready := make(chan bool, 1) // 3  

1 和 2 是相同的,都表示无缓冲的通道,3 不同,他是有缓冲的,通道的容量是 1,表示最多可以暂存 1 个数据在里面,下面这个例子帮助理解:

 func createBufChanVar() {
  //channel := make(chan string)
  channel := make(chan string, 1)
  go func() {
    fmt.Println(<-channel)
  }()

  channel <- "我来啦"

  // 死锁
  //channel <- "我来啦"
  //channel <- "我来啦"
  //fatal error: all goroutines are asleep - deadlock!

  for i := 0; i < 10; i++ {
    fmt.Printf("for 循环 %d\n", i)
    time.Sleep(1 * time.Second)
  }
}  

如果 channel 是无缓冲的,那么执行到 channel <- “我来啦” 会阻塞,一直等到 go func 执行到 <- channel 同步交接数据,”我来啦” 永远会打印在 for 循环前面,但如果 channel 是有缓冲的,channel <- “我来啦” 可以把数据放进通道继续往后执行,这时候 for 循环可能会先打印,如果连续的三条 channel <- “我来啦” 程序就会发生死锁,因为第一次放进去,go func 取走了一个,通道空了还可以放一个但是没有谁会再取走了,再次执行 channel <- “我来啦” 后阻塞,因为通道大小是 1,再也放不下了,go 运行时检测到死锁,程序崩溃。

所以有缓冲的通道,通道中没有塞满的时候不会发生阻塞,如果通道满了,放入数据就会阻塞,如果通道空了,取数据就会阻塞,发送方和接受方在交接数据的时候不需要同步准备好,不用同时见面就可以完成数据交换,但是无缓冲的通道就必须面对面交易,等我来验货不然你就给我等着,因为交易地点没有暂存货物的地方。

回到本文开始,time.After 返回的类型是 <- chan time.Time,这是只读通道,只能发出数据,同样的 chan <- time.Time 就是只写通道,只能接收数据,这两种都叫单向通道,可读可写的就是双向通道。

你可能认为有缓冲的通道比无缓冲的通道有用,事实并非如此,它们都有各自的应用场景,可以使用熟悉的 len 和 cap 函数获取通道当前的数据流长度和容量。

 func lenCap() {
  d1 := make(chan int)
  d2 := make(chan int, 3)
  d2 <- 1
  fmt.Println(len(d1), cap(d1)) // 0 0
  fmt.Println(len(d2), cap(d2)) // 1 3
}  

有缓冲的通道的数据流可以使用 for-range 读取,一直到通道关闭为止。

 func forRangeChan() {
  queue := make(chan string, 3)

  go func() {
    queue <- "one"
    queue <- "two"
    queue <- "three"
    close(queue) // 去掉 close for-range 将死锁
  }()

  for elem := range queue {
    fmt.Println(elem)
  }
}  

看看下面的官网的打印斐波那契数列的例子:

 func fibonacci(n int, c chan int) {
  x, y := 0, 1
  for i := 0; i < n; i++ {
    c <- x
    x, y = y, x+y
  }
  close(c) // 一定要 close,不然 for-range 会阻塞死锁
}

func main() {
  c := make(chan int, 10)
  go fibonacci(cap(c), c)
  for i := range c {
    fmt.Println(i)
  }
}  

有时候我们不关心 chan 通道里的值,就像做饭的第一个例子一样,只想表示某件事或者某个状态达成了,可以使用 chan struct {},通过结构体章节的学习知道 struct {} 空结构体是内存空间的,会被编译器特殊处理,它的内存地址固定的。做一个简单的服务器,这个服务器唯一的功能就是打印字符串和序号:

 //server 服务实例
type server struct {
  stopFlag  chan struct{}
  waitGroup sync.WaitGroup
}

//Work 做事
func (s *server) Work(index int) {
  time.Sleep(50 * time.Microsecond)
  fmt.Printf("干活儿 %d\n", index)
}

//Stop 停止
func (s *server) Stop() {
  close(s.stopFlag) // 直接关闭通道
  s.waitGroup.Wait() // 等带 handle 的 goroutine 都正常结束
}

func (s *server) handle(index int) {
  s.waitGroup.Add(1)
  go func() {
    for {
      select {
      case <-s.stopFlag:
        fmt.Printf("handle %d over", index)
        s.waitGroup.Done()
        return
      default:
        s.Work(index)
      }
    }
  }()
}  

按照 go 语言的习惯,添加一个工场函数:

 func newServer() *server {
  return &server{make(chan struct{}), sync.WaitGroup{}}
}  

下面的代码创建 server 后,给它分配一点儿的工作,在 500 毫秒后通知它停下来:

 func createServerTest() {
  srv := newServer()
  for i := 0; i < 3; i++ {
    srv.handle(i)
  }
  select {
  case <-time.After(500 * time.Microsecond):
    srv.Stop()
  }
}  

这个服务器的例子使用无缓冲的通道实现了 “是否应该停止服务” 这个事件的通讯,很多编程语言中多个线程传递数据的方式是共享内存,因此需要对内存进行读写保护,go 语言也支持这种共享内存的模型,但是更提倡使用通讯顺序进程 CSP 来完并发操作,goroutine 就是 CSP 模型里的实体,chan 就是 CSP 模型里的传递信息的媒介,也就是 goroutine 通过 chan 来完成通讯(数据传递)。

通道 chan 通讯前面已经有很多例子了,相信你已经理解了,下面举例一个 chan 锁定的例子,它并不用来通讯,只用来保存一段指令对内存的操作。代码的功能是完成循环计数,不过你多次运行发现它并不能得到正确的结果,理论上应该得到 100,但是运行可能出现 99、98 等。

 func inc(x *int) {
  *x = *x + 1
}

func sumByLoop() {
  x := 0
  for i := 0; i < 100; i++ {
    go inc(&x)
  }
  // 为了代码结构简洁,简单的加个停顿(应该使用 sync.waitGroup 实现)
  time.Sleep(1 * time.Second)
  fmt.Println(x)
}  

因为 *x = *x + 1 不是原子操作,它点读取 x,累加后再写入,多个协程工作的时候,有的协程读取了老的 x 累加后会写入把其他协程的累加计数覆盖了,所以最终的结果可能小于 100,下面是修改后的版本,使用一个容量大小是 1 的有缓冲通道,来控制 *x = *x + 1,此处的 chan 并没有用来传递数据:

 func inc(x *int, lock chan struct{}) {
  lock <- struct{}{}
  *x = *x + 1
  <-lock
}

func sumByLoop() {
  x := 0
  lock := make(chan struct{}, 1)
  for i := 0; i < 100; i++ {
    go inc(&x, lock)
  }
  // 为了代码结构简洁,简单的加个停顿(应该使用 sync.waitGroup 实现)
  time.Sleep(1 * time.Second)
  fmt.Println(x)
}  

再来运行,每次都是 100 了,因为建立的是有缓冲的通道,执行 lock <- struct{}{} 后,还能继续往下运行,又因为缓冲通道的大小只有 1,上一个协程在没有执行 <-lock 将通道清空之前,其他的协程只能等着把 struct{}{} 放进去,这样执行 *x = *x +1 的永远只有一个协程,保护了这段内存空间的操作,这是一个典型的互斥锁,可直接使用系统提供的 sync.Mutex 来实现:

 func inc2(x *int, mutex *sync.Mutex) {
  mutex.Lock()
  *x = *x + 1
  mutex.Unlock()
}

func sumByLoop2() {
  x := 0
  mutex := &sync.Mutex{}
  for i := 0; i < 100; i++ {
    go inc2(&x, mutex)
  }
  // 为了代码结构简洁,简单的加个停顿(应该使用 sync.waitGroup 实现)
  time.Sleep(1 * time.Second)
  fmt.Println(x)
}  

回到协程通道的话题,有没有发现常见一个协程后,无法主动的强制去关闭它,最优雅的方式就是通知它让它自己退出,或者整个进程退出了。当需要管理很多个协程的状态(和很多子协程),使用 context 包也许更加清晰明了,它的接口申明如下:

 type Context interface {
  Deadline() (deadline time.Time, ok bool) // 到期时间
  Done() <-chan struct{} // 做完了
  Err() error // 报错信息
  Value(key interface{}) interface{} // 绑定的值
}  

Context 的几个构造函数:

1. context.WithCancel

2. context.WithDeadline

3. context.WithTimeout

4. context.WithValue

它们的第一个参数都是父 context,默认 context.Background 是根 context,根 context 不能不取消。

模仿前面 server 的功能:

 func handle(ctx context.Context, index int) {
  for {
    select {
    case <-ctx.Done():
      fmt.Printf("handle %d over", index)
      return
    default:
      fmt.Printf("干活了 %d\n", index)
    }
  }
}

func contextServerTest() {
  ctx, cancel := context.WithCancel(context.Background())
  for i := 0; i < 3; i++ {
    go handle(ctx, i)
  }
  time.Sleep(1 * time.Second)  
  cancel() // 可以 defer cancel
  time.Sleep(1 * time.Second)
}  

服务器开启了 3 个协程工作,1 秒后手动 cancel 通知他们停下来,如果超时自动 cancel 可以使用 context.WithTimeout 或者 context.WithDeadline,一个是相对时间,一个绝对时间:

 func contextServerTest2() {
  ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) 
  //ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  //cancel() // 仍然可以手动提前结束
  for i := 0; i < 3; i++ {
    go handle(ctx, i)
  }
  time.Sleep(2 * time.Second)
}  

不过这两个例子并没有演示 context 的优势,稍微修改一下,当父 context 被 cancel 的时候,子 context 也会遭殃,这样 goroutine 就好管理了,如此推导,是不是就理解了?

 func contextServerTest3() {
  parent, parentCancel := context.WithCancel(context.Background())
  ctx1, _ := context.WithCancel(parent)
  ctx2, _ := context.WithTimeout(parent, 10*time.Second)
  for i := 0; i < 3; i++ {
    go handle(ctx1, i)
  }
  for i := 3; i < 6; i++ {
    go handle(ctx2, i)
  }
  time.Sleep(1 * time.Second)
  parentCancel() // 子 ctx1,ctx2 也会遭殃
  time.Sleep(1 * time.Second)
}  

本章节的代码

文章来源:智云一二三科技

文章标题:大白话 golang 教程-15-使用协程和并发通道

文章地址:https://www.zhihuclub.com/86280.shtml

关于作者: 智云科技

热门文章

网站地图