您的位置 首页 golang

Golang之通道(Channel)

写在前面:

0x01 — 无缓存channel

定义一个channel,必须使用make函数:

 c1 := make(chan int)
c2 := make(chan struct{})
c3 := make(chan interface{})
c4 := make(chan string)  

channel通过操作符 <- 实现发送和接收数据

 c1 <- 12 // 将12发送到通道c1中
v1 := <- c1 // 从c1通道中获取值并赋给v1
<-c1 // 这个语句表示结果会被丢掉  

下面看下示例:

 package main

import (
   "testing"
   "time"
)

func sum(a []int, c chan int, t *testing.T) {
   var s int
   for _, i := range a {
      s = s + i
   }
   t.Log("计算完成,睡眠一秒")
   time.Sleep(time.Second)
   t.Log("睡眠结束,向通道发送计算结果")
   c <- s
}

func TestChannel(t *testing.T) {
   l1 := []int{12,3,4,5,6}
   var cha1 = make(chan int) // 定义一个无缓存通道
   go sum(l1, cha1, t)
   t.Log("调用并发,等待通道回传数据")
   r1 := <- cha1
   t.Log("获取计算结果:", r1)
}  

输出:

 === RUN   TestChannel
    channel_test.go:23: 调用并发,等待通道回传数据
    channel_test.go:13: 计算完成,睡眠一秒
    channel_test.go:15: 睡眠结束,向通道发送计算结果
    channel_test.go:25: 获取计算结果: 30
--- PASS: TestChannel (1.00s)
PASS  

以上代码中通过对通道通信,完成了数据共享,运行代码时会发现,程序会阻塞在24行,就是从通道获取信息的位置,这是由于,同步通道会在发数据或收数据时都必须在通道完整的情况下,如果有发没收,发送就会阻塞,如果有收没发,收取就会阻塞,通过这个逻辑可以进行并发线程之间的同步。

通过这种同步可以建立一个管道:

 package main

import (
   "testing"
   "time"
)

func TestChannel2(t *testing.T) {
   // 创建两个无缓冲通道
   numL := make(chan int)
   sqL := make(chan int)

   go func() {
      for i := 0;;i++{ // 创建无限遍历逻辑
         numL <- i // 将i送入通道
         time.Sleep(time.Second) //每一秒产生一个数
      }
   }()
   go func() {
      for {
         sqL <- (<- numL) * 2 //获取numL通道的数据进行乘2后加入到sqL通道
      }
   }()

   for {// 通过遍历不断从sqL通道中读取数据
      t.Log("sqL管道出来的值:", <- sqL)
   }
}  

输出:

 === RUN   TestChannel2
    channel_test.go:26: sqL管道出来的值: 0
    channel_test.go:26: sqL管道出来的值: 2
    channel_test.go:26: sqL管道出来的值: 4
    channel_test.go:26: sqL管道出来的值: 6
    channel_test.go:26: sqL管道出来的值: 8
go tool test2json: signal: interrupt  

通过无缓冲管道,可以让两个协程之间的逻辑进行先后同步执行。

我们将代码最后面一段代码修改下:

 c := 0 // 定义一个值
for {// 通过遍历不断从sqL通道中读取数据
   t.Log("sqL管道出来的值:", <- sqL)
   if c > 5 {
      close(sqL) // 遍历5次后,将sqL关闭
   } else {
      c++
   }
}  

输出:

 === RUN   TestChannel2
    channel_test.go:28: sqL管道出来的值: 0
    channel_test.go:28: sqL管道出来的值: 2
    channel_test.go:28: sqL管道出来的值: 4
    channel_test.go:28: sqL管道出来的值: 6
    channel_test.go:28: sqL管道出来的值: 8
    channel_test.go:28: sqL管道出来的值: 10
    channel_test.go:28: sqL管道出来的值: 12
    channel_test.go:28: sqL管道出来的值: 0
--- FAIL: TestChannel2 (6.01s)
panic: close of closed channel [recovered]
	panic: close of closed channel  

输出后抛出panic,这是由于向一个关闭的通道进行发送导致的。对于最后一个打印的值是0,这是由于通道关闭,接收端可以不阻塞地从关闭的通道中获取零值。目前没有办法直接监测一个通道是否被关闭,不过可以通过接收通道的状态来获取,在接收一个通道返回数据时,可以通过两个变量接收,第二个变量表示当前通道的状态:

 v,ok := <- numL
if !ok {
   break
}  

在使用过程中尽量注意在发送端关闭通道,同时对同步接收端尽量不要无穷遍历。

由于通道最好在发送端关闭,所以我们在使用通道时,可以对函数中的通道进行标记,是接收还是发送,一般情况下一个通道在函数中只担任一种功能,发送或接收,我们可以标记出通道的功能,如果对接收的通道进行关闭操作,会编译失败

 package main

import (
   "fmt"
   "testing"
   "time"
)
func left(out chan<- int) {// 标记chan为输入
   for x := 1; x < 10; x++ {
      out <- x
   }
   close(out)
}
func right(in <-chan int) { // 标记chan为输出
   for {
      v := <-in
      if v == 0 {
         break
      }
      fmt.Println("--->>>:", v)
   }
}
func TestChannel3(t *testing.T) {
   l := make(chan int)
   go left(l)
   right(l) // 这个函数不能在并发运行,否则将会直接结束
}  

输出:

 === RUN   TestChannel3
--->>>: 1
--->>>: 2
--->>>: 3
--->>>: 4
--->>>: 5
--->>>: 6
--->>>: 7
--->>>: 8
--->>>: 9
--- PASS: TestChannel3 (0.00s)
PASS  

使用过程中建议明确下。

0x02– 有缓存channel

带缓存的Channel内会存在一个队列,队列的容量在创建时需要指定,不带缓存channel其实是队列为0的channel。

 ch1 := make(chan int, 5) // 定义一个可以缓存5个元素的通道  

向带缓存Channel的发送操作会向其内部队列的尾部插入元素,接收操作则是从队列的头部移除元素。如果内部缓存队列是满的,那么发送操作将会阻塞直到一个接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有一个发送操作而向队列插入元素。

 package main

import (
   "fmt"
   "testing"
   "time"
)

func insert(c chan<- int){
   for x := 1; x < 7; x++ {
      time.Sleep(time.Second)
      fmt.Println("正在发送:", x)
      c <- x
   }
   close(c)
}
func readChan(c <-chan int){
   for {
      fmt.Println("正在读取")
      v, ok:= <-c
      if !ok {
         break
      }
      fmt.Println("完成读取:", v)
      time.Sleep(time.Second*5) // 读取一次后,sheep 5秒,gen
   }
}

func TestChannel4(t *testing.T) {
   ch1 := make(chan int, 3) // 定义一个可以缓存3个元素的通道
   go insert(ch1)
   readChan(ch1)
}  

输出:

 === RUN   TestChannel4
正在读取
正在发送: 1
完成读取: 1
正在发送: 2
正在发送: 3
正在发送: 4
正在发送: 5
正在读取
完成读取: 2
正在发送: 6
正在读取
完成读取: 3
正在读取
完成读取: 4
正在读取
完成读取: 5
正在读取
完成读取: 6
正在读取
--- PASS: TestChannel4 (31.02s)
PASS  

从输出可以看出,在 readChan 函数中,第一次读取了一个,然后睡眠5秒,在这期间,insert函数里将队列充满,在 readChan 函数第二次读取后,insert函数又向channel发送了最后一个数据,并且关闭channel,之后每隔5秒钟,readChan都会读取其中的数据,直到读取状态为false,跳出。

接收通道信息还有一种方式可以方便的便利,上面代码中我们通过for循环类似while的方式实现,同时可以使用range来接收数据,我们改下上面的readChan函数逻辑:

 func readChan(c <-chan int){
   for v:= range c{
      fmt.Println("完成读取:", v)
      time.Sleep(time.Second*5) // 读取一次后,sheep 5秒,gen
   }
   fmt.Println("通道已关闭!")
}  

看下输出:

 === RUN   TestChannel4
正在发送: 1
完成读取: 1
正在发送: 2
正在发送: 3
正在发送: 4
正在发送: 5
完成读取: 2
正在发送: 6
完成读取: 3
完成读取: 4
完成读取: 5
完成读取: 6
通道已关闭!
--- PASS: TestChannel4 (31.03s)
PASS  

range遍历通道的数据类似slice ,如果发送端显式的关闭通道,则range通道自动遍历结束!

0x03 — 总结

在开发程序时,需要注意,只有生产者发送端才可以关闭channel,接收者关闭可能会导致panic,另外,channel不像文件一样需要经常关闭,只有当你确认不再发送数据时,在进行关闭即可。

文章来源:智云一二三科技

文章标题:Golang之通道(Channel)

文章地址:https://www.zhihuclub.com/86690.shtml

关于作者: 智云科技

热门文章

发表评论

您的电子邮箱地址不会被公开。

网站地图