您的位置 首页 golang

大白话 golang 教程-16-并发任务的固定模式

虽然我们可以自己使用 chan 来实现很多并发的框架和模式,但实际应用的时候先查查 sync 包总是没错的,比如 sync/atomic 包对原子操作进行了支持,上一个章节的 *x = *x +1,其实可以直接使用 atomic.AddUint64(&x, 1) 来实现。

标准库 sync.Once 可以实现只执行一次的功能,比如你指向创建一个对象(单件)。下面这个 born 函数永远只会造同一个人:

 type person struct{}

var instance *person
var once sync.Once

func born() *person {
  once.Do(func() {
    fmt.Println("构建 instance 对象")
    instance = &person{}
  })
  return instance
}  

我们启动 10 个 goroutine 来测试一下:

 func getInstance() {
  wg := sync.WaitGroup{}
  wg.Add(10)

  for i := 0; i < 10; i++ {
    go func() {
      defer wg.Done()
      instance := born()
      fmt.Printf("%p\n", instance) // 0x1254790
    }()
  }

  wg.Wait()
}  

每次获取到的 instance 的地址都是一样的,说明是同一个对象,它的实现机制如下:

 func (o *Once) Do(f func()) {
  if atomic.LoadUint32(&o.done) == 0 {
    o.doSlow(f)
  }
}

func (o *Once) doSlow(f func()) {
  o.m.Lock()
  defer o.m.Unlock()
  if o.done == 0 {
    defer atomic.StoreUint32(&o.done, 1)
    f()
  }
}  

首先是对通过 mutex 和 atomic.StoreUint32 配合来实现,看到这里 doSlow 函数就奇怪了,你不是有 mutex 吗? 为什么还要来一发 aotmic 的函数操作呢? doSlow 函数名其实已经说明了一切。doSlow 说明这个函数很慢,因为 mutex 是一个成本很高的操作,而整数的原子读写却轻量很多,所以 atomic.LoadUint32 先判断了一下 o.done 是不是为 0,不是的话就滚蛋了,少量的竞争者最后能到达 if o.done == 0 的判断,这个提前的判断提高了性能(请注意 defer 语句执行的顺序,对 mutex 解锁 Unlock 比设置 o.done 为 1 的操作后执行。

sync/atomic 不仅能对基本的数值类型进行原子操作,atomic.Value 还能对 interface{} 数据类型进行操作。

既然系统包这么好用,为什么不把所有的应用模式封装完呢? 有时候系统包并不好用,看下面的 mutex 的例子:

 func main() {
  var mu sync.Mutex
  go func(){
    fmt.Println("NO.1中国人")
    mu.Lock()
  }()
  mu.Unlock()
}  

程序的意图简洁清晰明了,看起来好像没有什么问题,但是执行大概率是报错:

 fatal error: sync: unlock of unlocked mutex  

因为 go func 的 goroutine 和 main 的 goroutine 执行顺序并不能保证谁先谁后,mu.Unlock 的时候 mu.Lock 还未执行,所以 Unlock 直接抛出了错误。要修改它也很麻烦,要这样写:

 func main() {
  var mu sync.Mutex
  mu.Lock
  go func(){
    fmt.Println("NO.1中国人")
    mu.Unock()
  }()
  mu.lock()
}  

是不是不好理解? 第二个 mu.Lock 的时候会等着 mu.Unlock 先执行,有点绕口。但是如果使用无缓冲的通道来重构代码不但简单而且很可靠:

 func main() {
  exit := make(chan int)
  go func() {
    fmt.Println("NO.1中国人")
    <-exit
  }()
  exit <- 1
}  

为什么很可靠? 因为 exit 没有缓冲,不管 exit <- 1 先执行到 还是 <-exit 先执行到,都得等待同步交接,咱们谁先到都见面聊。这个例子中 <- exit 和 exit <- 1 的位置可以交换,他们的读写本身并没有意义,chan 的类型也是无意义的,用 string、struct{} 都行,但是 chan 完成了比 mutex 很易懂的代码。把这个例子扩展一下,启动指定数量的任务:

 func lockBatch(taskCount int) {
  waitGroup := make(chan struct{}, taskCount)
  for i := 0; i < cap(waitGroup); i++ {
    go func(index int) {
      fmt.Printf("NO.1中国人%d\n", index)
      waitGroup <- struct{}{}
    }(i)
  }
  for i := 0; i < cap(waitGroup); i++ {
    <-waitGroup
  }
}  

这不就是 sync.WaitGroup 干的事情吗? 这就是框架,或者说模式。

这个模式可以启动 N 个协程,来做 N 件事情,每个协程和任务的比例是 1:1,不过太多的协程可能不利于管理和回收,能不能抽象出 N 个协程做 M 件事情的模式呢? 为了让 goroutine 可以不断的做任务,可以使用一个有缓冲的通道来保存任务,极端的情况是只有一个 goroutine 来做任务,它不断的从缓冲通道里取出任务执行,知道通道关闭为止,由于不同的 goroutine 可以安全的从通道中读取任务,因此可以随意调节 goroutine 的数目而不用修改代码。定一个 handle 函数表示做任务的工人:

 var wg sync.WaitGroup

func handle(tasks chan string, index int) {
  defer wg.Done()
  for {
    // 读取关闭的无缓冲通道,返回零值和false
    // 读取关闭的有缓冲通道,先把缓存数据读完后,再读取返回零值和false
    task, ok := <-tasks
    if !ok {
      // 任务通道关闭了,不会再有新任务了
      fmt.Printf("handle %d over\n", index)
      return
    }
    fmt.Printf("干活儿 %s By 工人 %d\n", task, index)
  }
}  

tasks 表示任务的需求,如果只需要一个工人,直接 go handle 就行了,多个工人只需一个循环,工人下班的标志依靠 wg 来保证,干完活就能下班,而 tasks 通道用来接收任务。

 func createTasks(goroutines, taskCount int) {
  tasks := make(chan string, taskCount)

  // 启动 N
  wg.Add(goroutines)
  for i := 0; i < goroutines; i++ {
    go handle(tasks, i)
  }

  // 设置 M
  for k := 0; k <= taskCount; k++ {
    tasks <- fmt.Sprintf("任务 %d", k)
  }

  close(tasks)
  wg.Wait()
}  

参数 goroutines 是工人的个数(协程的个数),而 taskCount 是任务的数量。需要注意的时候,在通道 close 关闭后,通道中的数据依然可以读取直到读完后再次读取事 task 是零值(这里是空字符串,因为 chan 的类型是 string),ok 为 false,也就是没有任务了,工人都可以下班了(wg.Done),而 wg.Wait 会等所有工人都下班后,才把工厂的门关上(函数返回)。如此,就完成了 N:M 的任务模型,这里 N 和 M 可以通过调用参数传递数量,而你不用修改内部的代码。

进一步延伸这个例子,就是生产者和消费者,而生产者和消费者的数量可以是任何比例。

 func producer(name string, tasks chan<- string) {
  index := 0
  for {
    tasks <- fmt.Sprintf("%s-task-%d", name, index)
    time.Sleep(50 * time.Millisecond)
    index++
  }
}

func consumer(name string, tasks <-chan string) {
  for task := range tasks {
    fmt.Printf("%s-%s has been token\n", name, task)
    time.Sleep(50 * time.Millisecond)
  }
}  

producer 生产者是个是循环,源源不断的造东西,而 consumer 一直从 tasks 里取货,模拟一个工厂的代码:

 func createFactory(stockSize, producerCount, consumerCount int) {
  tasks := make(chan string, stockSize)
  for i := 0; i <= producerCount; i++ {
    go producer(fmt.Sprintf("p%d", i), tasks)
  }

  for i := 0; i <= consumerCount; i++ {
    go consumer(fmt.Sprintf("c%d", i), tasks)
  }

  exit := make(chan os.Signal, 1)
  // ctrl+c 中断程序
  signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
  <-exit
}  

stockSize 是车间大小,表示了最多能存多少货,producerCount 是生产者个数,consumerCount 是消费者个数,当生产者 pdoducer 足够多的时候,把仓库 stockSize 塞满了就阻塞了没法再生产了,除非消费者 consumer 来拿走一些;反之如果消费者足够多的时候,仓库里没货也只能等着了,除非生产者 producer 造点货出来。

为了达到平衡,如果希望生产者以稳定的效率工作,不多不少,如何控制呢? 非常简单:

 func producer2(max int, tasks chan<- string, doSth func() string) {
  lines := make(chan int, max)
  for {
    lines <- 1
    tasks <- doSth()
    <-lines
  }
}  

max 表示最大的生产线,每次开一个生产线,就往通道里计数一个,生产完了,再从通道取出一个,因为通道最大是 max,满了就阻塞了,随时都保证了只有 max 条生产线存在。

下面的这个例子实现的功能是,等待一个对象完成自己的工作,和前面不同的是,它把 chan 的状态作为对象的一部分:

 type request struct {
  data   []interface{} // 模拟要处理的数据
  finish chan struct{} // 完成后的标志
}

func newRequest(data ...interface{}) *request {
  return &request{data, make(chan struct{}, 1)}
}

func doWork(req *request) {
  time.Sleep(1 * time.Second) // 模拟耗时
  fmt.Println(req.data)       // 工作内容就是打印
  close(req.finish)           // req.finish <- struct{}{} 也行
}

func createWork() {
  req := newRequest(1, "string", true)
  go doWork(req)

  for i := 0; i < 100; i++ {
    time.Sleep(50 * time.Millisecond)
    fmt.Println("继续做事,我很忙")
  }

  <-req.finish
}  

createWork 函数通过并发提高了处理效率,单独开了一条线去处理 request,同时它可以继续往后做事,但是它保证在函数返回前 req 对象肯定被处理了。关于 chan 和并发,还有很多有用的固定模式,希望这几个例子可以抛砖引玉。

本章节的代码

文章来源:智云一二三科技

文章标题:大白话 golang 教程-16-并发任务的固定模式

文章地址:https://www.zhihuclub.com/87280.shtml

关于作者: 智云科技

热门文章

发表评论

您的电子邮箱地址不会被公开。

网站地图