本章将学习 go 语言和其他语言最大的不同,在语法级别支持并发操作(天生),并发编程的门槛是很高的,通常写高质量的并发代码并不是那么容易,但是 golang 把这个难度降低了,让程序员写出高性能而且易于理解不绕口的并发代码。
系统调度的单位是线程,而线程的上下文切换是非常消耗资源的,协程非常的轻量,并且它对系统来说是透明的,它属于 go 运行时来管理,在普通的函数前加一个 go 关键字,就可以创建一个协程,对,就这么简单,不需要关心线程池、状态等复杂的概念,go 的抢占式调度会帮你安排好一切,由于协程创建的代价太小了,一个线程可能会占用 2M 的内存空间,而一个协程起步只需要 2K(随着栈增长会增加),所以你可以创建几百上千万的协程,而且你根本不需要一个协程池。
不过要掌握好 go 的并发,也需要好好学习它的调度模型(GMP)和用法。下面的代码创建了 2 个协程:
func main() {
go func() {} ()
}
一个是 main,一个是匿名的 func。
func main() {
go func() {
fmt.Println("我来自协程")
}()
fmt.Println("wait...")
// select {}
time.Sleep(3 * time.Second)
}
上面的输出结果是:
wait...
我来自协程
// 顺序不保证,也可能打印
我来自协程
wait...
使用 time.Sleep 的原因是协程虽然轻量,但是调度和执行也是需要时间的,为了让 main 不要退出的太快让协会程有机会运行(正式场景不会这样用),如果去掉 go 关键字,那么匿名函数会先执行才能打印 wait… 完全是按顺序执行的,如果在循环里那就是挨个循环执行,但如果加上 go 关键字,就创建了 N 个协程并发执行,时间将会大大缩短。修改一下代码:
func main() {
for i := 0; i < 10; i++ {
go func(index int) {
fmt.Printf("我来自协程 %d\n", index)
time.Sleep(1 * time.Second)
}(i)
}
fmt.Println("wait...")
// select {}
time.Sleep(3 * time.Second)
}
试试去掉 go 关键字,你发现打印的顺序是一定的,而且时间比加上 go 关键字要长很多,这就是并发的优势,不过这里 time.Sleep 的 3秒钟实在是太丑了,更大的问题是万一创建了 N 个协程,3 秒内还没有完成呢? 改成 5 秒,7 秒? 显然不 OK,因为我们需要对协程做的工作是否完成有个监控,使用 sync.WaitGroup 可以轻松的完成这个工作。修改一下:
var wg sync.WaitGroup
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 对协程进行计数
go func(index int) {
fmt.Printf("我来自协程 %d\n", index)
wg.Done() // 通知一下,本协程把问题搞完了
}(i)
}
fmt.Println("wait...")
wg.Wait() // 等待所有的协程通知我
}
现在好了,使用全局的 wg 变量类通讯,不但并发高效的完成了工作,而且 main 也能及时退出了。WaitGroup 通过对协程同步计数来完成任务,这些协程之间本身并不认识,如果协程和协程需要通讯怎么办? 比如,一个做饭,一个吃饭,吃饭的总得等着做饭的把饭做好吧。别急,先回到上一章节等待时间的例子:
func timeAfterTest() {
deadline := time.After(1 * time.Second)
fmt.Printf("%T\n", deadline) // <-chan time.Time
select {
case <-deadline:
fmt.Println("到时间了")
}
}
发现了一个新的类型: chan,chan 表示通道/信道(就像水管,里面流淌着数据流),chan time.Time 表示通道里的数据类型是 time.Time,<- chan time.Time 表示通道里可以拿出来 time.Time 数据,select 一直等待着 deadline 里留出 time.Time 数据,等到了 case 分支就被执行了,这个机制实现了 main 和时间倒计数协程的通讯,因为 select 本身是阻塞的,deadline 没有通知 case 无法运行,select 就无法结束。
所以,通道就是协程用来通讯的机制,它本身是一个容器,这个容器有大小,可以打开、可以关闭。我们使用通道来模拟吃饭的例子:
func produce(ready chan bool) {
fmt.Println("做饭中...")
time.Sleep(50 * time.Microsecond)
ready <- true
}
func eatTest() {
ready := make(chan bool) // 无缓冲
go produce(ready) // 去掉 go 死锁
for {
select {
case <-ready:
fmt.Println("可以吃了")
return
default:
fmt.Println("我先干点别的事")
}
}
}
上面代码的细节:
1. 使用 make 函数来构造 chan 类型的变量,省略第二个参数表示通道没有缓冲,没有缓冲的意思就是发送和接受两方必须同步交接
2. for 是个死循环,当 <- ready 不能交接的时候,执行 default 分支,有交接了 return 结束了整个函数
3. 使用 go produce(ready) 来创建做饭的协程,而且看出 chan 是引用传递,如果去掉 go 关键字 ready,ready<- true 放不进去因为没有同步的接收方,导致死锁
4. chan 类型的变量是同步安全的,由 go 运行时来保证,写代码不需要考虑它被多个协程都同步修改的情况
如果理解有困难,把代码精简一下:
func eatTest2() {
ready := make(chan bool) // 无缓冲
go produce(ready) // 去掉 go 死锁
<-ready // 同步交接
fmt.Println("可以吃了")
}
记住,无缓冲的通道必须同步交接,没有接收方发送方也无法把数据放进通道中,main 中的 <- ready 试图从 ready 通道中拿点数据,但是 produce 函数还没有准备好,main 就阻塞了(如果先是 produce 执行到了也阻塞等带 main),执行到 ready <- true 的时候两边都准备好,同步交接完成,main 和 produce 继续往下执行。
这里 <- ready 并没有去关心通道里的值究竟是多少,如果 produce 函数把通道 close 了(close 函数导致通道广播),<-ready 也能收到信号,但是关闭的通道不能再打开了。
如果要模拟做饭失败的情况修改如下:
func produce2(ready chan bool) {
fmt.Println("做饭中...")
// 做饭成功的与否的算法
success := false
if time.Now().Second()%2 == 0 {
success = true
}
ready <- success
}
func eatTest3() {
ready := make(chan bool) // 无缓冲
go produce2(ready) // 去掉 go 死锁
success := <-ready // 同步交接
//success, ok := <-ready // 关闭的通道 ok 为 false
if success {
fmt.Println("可以吃了")
} else {
fmt.Println("喝西北风")
}
}
操作符 <- 看起来点绕,如果 chan 变量在左边 chanVar <- xx 表示写入,在右边 <- chanVar 表示读取,可见 -< 是指数据流向的方向。上面的 chan 通道都是无缓冲的,相对的有缓冲的通道需要给 make 第二个参数。
ready := make(chan bool) // 1
ready := make(chan bool, 0) // 2
ready := make(chan bool, 1) // 3
1 和 2 是相同的,都表示无缓冲的通道,3 不同,他是有缓冲的,通道的容量是 1,表示最多可以暂存 1 个数据在里面,下面这个例子帮助理解:
func createBufChanVar() {
//channel := make(chan string)
channel := make(chan string, 1)
go func() {
fmt.Println(<-channel)
}()
channel <- "我来啦"
// 死锁
//channel <- "我来啦"
//channel <- "我来啦"
//fatal error: all goroutines are asleep - deadlock!
for i := 0; i < 10; i++ {
fmt.Printf("for 循环 %d\n", i)
time.Sleep(1 * time.Second)
}
}
如果 channel 是无缓冲的,那么执行到 channel <- “我来啦” 会阻塞,一直等到 go func 执行到 <- channel 同步交接数据,”我来啦” 永远会打印在 for 循环前面,但如果 channel 是有缓冲的,channel <- “我来啦” 可以把数据放进通道继续往后执行,这时候 for 循环可能会先打印,如果连续的三条 channel <- “我来啦” 程序就会发生死锁,因为第一次放进去,go func 取走了一个,通道空了还可以放一个但是没有谁会再取走了,再次执行 channel <- “我来啦” 后阻塞,因为通道大小是 1,再也放不下了,go 运行时检测到死锁,程序崩溃。
所以有缓冲的通道,通道中没有塞满的时候不会发生阻塞,如果通道满了,放入数据就会阻塞,如果通道空了,取数据就会阻塞,发送方和接受方在交接数据的时候不需要同步准备好,不用同时见面就可以完成数据交换,但是无缓冲的通道就必须面对面交易,等我来验货不然你就给我等着,因为交易地点没有暂存货物的地方。
回到本文开始,time.After 返回的类型是 <- chan time.Time,这是只读通道,只能发出数据,同样的 chan <- time.Time 就是只写通道,只能接收数据,这两种都叫单向通道,可读可写的就是双向通道。
你可能认为有缓冲的通道比无缓冲的通道有用,事实并非如此,它们都有各自的应用场景,可以使用熟悉的 len 和 cap 函数获取通道当前的数据流长度和容量。
func lenCap() {
d1 := make(chan int)
d2 := make(chan int, 3)
d2 <- 1
fmt.Println(len(d1), cap(d1)) // 0 0
fmt.Println(len(d2), cap(d2)) // 1 3
}
有缓冲的通道的数据流可以使用 for-range 读取,一直到通道关闭为止。
func forRangeChan() {
queue := make(chan string, 3)
go func() {
queue <- "one"
queue <- "two"
queue <- "three"
close(queue) // 去掉 close for-range 将死锁
}()
for elem := range queue {
fmt.Println(elem)
}
}
看看下面的官网的打印斐波那契数列的例子:
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c) // 一定要 close,不然 for-range 会阻塞死锁
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}
有时候我们不关心 chan 通道里的值,就像做饭的第一个例子一样,只想表示某件事或者某个状态达成了,可以使用 chan struct {},通过结构体章节的学习知道 struct {} 空结构体是内存空间的,会被编译器特殊处理,它的内存地址固定的。做一个简单的服务器,这个服务器唯一的功能就是打印字符串和序号:
//server 服务实例
type server struct {
stopFlag chan struct{}
waitGroup sync.WaitGroup
}
//Work 做事
func (s *server) Work(index int) {
time.Sleep(50 * time.Microsecond)
fmt.Printf("干活儿 %d\n", index)
}
//Stop 停止
func (s *server) Stop() {
close(s.stopFlag) // 直接关闭通道
s.waitGroup.Wait() // 等带 handle 的 goroutine 都正常结束
}
func (s *server) handle(index int) {
s.waitGroup.Add(1)
go func() {
for {
select {
case <-s.stopFlag:
fmt.Printf("handle %d over", index)
s.waitGroup.Done()
return
default:
s.Work(index)
}
}
}()
}
按照 go 语言的习惯,添加一个工场函数:
func newServer() *server {
return &server{make(chan struct{}), sync.WaitGroup{}}
}
下面的代码创建 server 后,给它分配一点儿的工作,在 500 毫秒后通知它停下来:
func createServerTest() {
srv := newServer()
for i := 0; i < 3; i++ {
srv.handle(i)
}
select {
case <-time.After(500 * time.Microsecond):
srv.Stop()
}
}
这个服务器的例子使用无缓冲的通道实现了 “是否应该停止服务” 这个事件的通讯,很多编程语言中多个线程传递数据的方式是共享内存,因此需要对内存进行读写保护,go 语言也支持这种共享内存的模型,但是更提倡使用通讯顺序进程 CSP 来完并发操作,goroutine 就是 CSP 模型里的实体,chan 就是 CSP 模型里的传递信息的媒介,也就是 goroutine 通过 chan 来完成通讯(数据传递)。
通道 chan 通讯前面已经有很多例子了,相信你已经理解了,下面举例一个 chan 锁定的例子,它并不用来通讯,只用来保存一段指令对内存的操作。代码的功能是完成循环计数,不过你多次运行发现它并不能得到正确的结果,理论上应该得到 100,但是运行可能出现 99、98 等。
func inc(x *int) {
*x = *x + 1
}
func sumByLoop() {
x := 0
for i := 0; i < 100; i++ {
go inc(&x)
}
// 为了代码结构简洁,简单的加个停顿(应该使用 sync.waitGroup 实现)
time.Sleep(1 * time.Second)
fmt.Println(x)
}
因为 *x = *x + 1 不是原子操作,它点读取 x,累加后再写入,多个协程工作的时候,有的协程读取了老的 x 累加后会写入把其他协程的累加计数覆盖了,所以最终的结果可能小于 100,下面是修改后的版本,使用一个容量大小是 1 的有缓冲通道,来控制 *x = *x + 1,此处的 chan 并没有用来传递数据:
func inc(x *int, lock chan struct{}) {
lock <- struct{}{}
*x = *x + 1
<-lock
}
func sumByLoop() {
x := 0
lock := make(chan struct{}, 1)
for i := 0; i < 100; i++ {
go inc(&x, lock)
}
// 为了代码结构简洁,简单的加个停顿(应该使用 sync.waitGroup 实现)
time.Sleep(1 * time.Second)
fmt.Println(x)
}
再来运行,每次都是 100 了,因为建立的是有缓冲的通道,执行 lock <- struct{}{} 后,还能继续往下运行,又因为缓冲通道的大小只有 1,上一个协程在没有执行 <-lock 将通道清空之前,其他的协程只能等着把 struct{}{} 放进去,这样执行 *x = *x +1 的永远只有一个协程,保护了这段内存空间的操作,这是一个典型的互斥锁,可直接使用系统提供的 sync.Mutex 来实现:
func inc2(x *int, mutex *sync.Mutex) {
mutex.Lock()
*x = *x + 1
mutex.Unlock()
}
func sumByLoop2() {
x := 0
mutex := &sync.Mutex{}
for i := 0; i < 100; i++ {
go inc2(&x, mutex)
}
// 为了代码结构简洁,简单的加个停顿(应该使用 sync.waitGroup 实现)
time.Sleep(1 * time.Second)
fmt.Println(x)
}
回到协程通道的话题,有没有发现常见一个协程后,无法主动的强制去关闭它,最优雅的方式就是通知它让它自己退出,或者整个进程退出了。当需要管理很多个协程的状态(和很多子协程),使用 context 包也许更加清晰明了,它的接口申明如下:
type Context interface {
Deadline() (deadline time.Time, ok bool) // 到期时间
Done() <-chan struct{} // 做完了
Err() error // 报错信息
Value(key interface{}) interface{} // 绑定的值
}
Context 的几个构造函数:
1. context.WithCancel
2. context.WithDeadline
3. context.WithTimeout
4. context.WithValue
它们的第一个参数都是父 context,默认 context.Background 是根 context,根 context 不能不取消。
模仿前面 server 的功能:
func handle(ctx context.Context, index int) {
for {
select {
case <-ctx.Done():
fmt.Printf("handle %d over", index)
return
default:
fmt.Printf("干活了 %d\n", index)
}
}
}
func contextServerTest() {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 3; i++ {
go handle(ctx, i)
}
time.Sleep(1 * time.Second)
cancel() // 可以 defer cancel
time.Sleep(1 * time.Second)
}
服务器开启了 3 个协程工作,1 秒后手动 cancel 通知他们停下来,如果超时自动 cancel 可以使用 context.WithTimeout 或者 context.WithDeadline,一个是相对时间,一个绝对时间:
func contextServerTest2() {
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
//ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
//cancel() // 仍然可以手动提前结束
for i := 0; i < 3; i++ {
go handle(ctx, i)
}
time.Sleep(2 * time.Second)
}
不过这两个例子并没有演示 context 的优势,稍微修改一下,当父 context 被 cancel 的时候,子 context 也会遭殃,这样 goroutine 就好管理了,如此推导,是不是就理解了?
func contextServerTest3() {
parent, parentCancel := context.WithCancel(context.Background())
ctx1, _ := context.WithCancel(parent)
ctx2, _ := context.WithTimeout(parent, 10*time.Second)
for i := 0; i < 3; i++ {
go handle(ctx1, i)
}
for i := 3; i < 6; i++ {
go handle(ctx2, i)
}
time.Sleep(1 * time.Second)
parentCancel() // 子 ctx1,ctx2 也会遭殃
time.Sleep(1 * time.Second)
}
本章节的代码