在实践中,有时会遇到多个生产者合并消费的情况,今天就来用channel来实现这一需求。
上代码:
package main
import "fmt"
func main() {
a, b, c := make(chan interface{}), make(chan interface{}), make(chan interface{})
go func(in chan interface{}) {
defer close(in)
for i := 0; i < 10; i++ {
in <- i
}
}(a)
go func(in chan interface{}) {
defer close(in)
for i := 10; i < 30; i++ {
in <- i
}
}(b)
go func(in chan interface{}) {
defer close(in)
for i := 30; i < 60; i++ {
in <- i
}
}(c)
group := orChannel(a, b, c)
for v := range group {
fmt.Println(v)
}
}
func orChannel(in ...chan interface{}) chan interface{} {
if len(in) == 0 {
return nil
}
if len(in) == 1 {
return in[0]
}
result := make(chan interface{})
go func() {
defer close(result)
c1, c2 := in[0], orChannel(in[1:]...)
loop:
for {
select {
case v, ok := <-c1:
if !ok {
c1 = nil
} else {
result <- v
}
case v, ok := <-c2:
if !ok {
c2 = nil
} else {
result <- v
}
default:
if c1 == nil && c2 == nil {
break loop
}
}
}
}()
return result
}
关键点:
orChannel这个函数使用了递归调用的技巧,可以接收任意数量的channel,将他们合并成为一个channel进行统一消费。