您现在的位置是:首页 >学无止境 >Go 并发之channel(通道)网站首页学无止境
Go 并发之channel(通道)
一、前言
作为 Go 语言最有特色的数据类型,通道(channel)完全可以与 goroutine(也可称为 go 程)并驾齐驱,共同代表 Go 语言独有的并发编程模式和编程哲学。
通道(channel)可以利用通道在多个 goroutine 之间传递数据。
二、什么是通道
2.1 通道声明和入门
通道类型的值本身就是并发安全的,这也是 Go 语言自带的、唯一一个可以满足并发安全性的类型。它使用起来十分简单。
在声明并初始化一个通道的时候,需要用到 Go 语言的内建函数make。就像用make初始化切片那样,传给这个函数的第一个参数是代表了通道的具体类型的类型字面量。
在声明一个通道类型变量的时候,首先要确定该通道类型的元素类型,这决定了可以通过这个通道传递什么类型的数据。
- 比如,类型字面量chan int,其中的chan是表示通道类型的关键字,而int则说明了该通道类型的元素类型。
- 比如,chan string代表了一个元素类型为string的通道类型
在初始化通道的时候,make函数除了必须接收这样的类型字面量作为参数,还可以接收一个int类型的参数。后者是可选的,用于表示该通道的容量。
所谓通道的容量,就是指通道最多可以缓存多少个元素值。
由此,虽然这个参数是int类型的,但是它是不能小于0的。
- 当容量为0时,可以称通道为非缓冲通道,也就是不带缓冲的通道。
- 当容量大于0时,可以称为缓冲通道,也就是带有缓冲的通道。
非缓冲通道和缓冲通道有着不同的数据传递方式。
一个通道相当于一个先进先出(FIFO)的队列。也就是说,通道中的各个元素值都是严格地按照发送的顺序排列的,先被发送通道的元素值一定会先被接收。
元素值的发送和接收都需要用到操作符<-。也可以叫它接送操作符。一个左尖括号紧接着一个减号形象地代表了元素值的传输方向。
代码示例
package main
import "fmt"
func main() {
// 声明并初始化了一个元素类型为int、容量为3的通道ch1
ch1 := make(chan int, 3)
// 向该通道先后发送了三个元素值2、1和3
ch1 <- 2
ch1 <- 1
ch1 <- 3
// 语句elem1 := <-ch1会将最先进入ch1的元素2接收来并存入变量elem1
elem1 := <-ch1
fmt.Printf("The first element received from channel ch1: %v
",elem1)
}
由于该通道的容量为 3,所以,可以在通道不包含任何元素值的时候,连续地向该通道发送三个值,此时这三个值都会被缓存在通道之中。
当需要从通道接收元素值的时候,同样要用接送操作符<-,只不过,这时需要把它写在变量名的左边,用于表达“要从该通道接收一个元素值”的语义。比如:<-ch1,这也可以被叫做接收表达式。
在一般情况下,接收表达式的结果将会是通道中的一个元素值。如果需要把如此得来的元素值存起来,那么在接收表达式的左边就需要依次添加赋值符号(=或:=)和用于存值的变量的名字。
2.2 通道的发送与接收特性
Go 语言中最常见的、也是经常被人提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存
。
在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。下面是多线程之间使用共享内存实现传递数据图示。
虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)1。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。
上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。
发送和接收的基本特性
- 对于同一个通道,发送操作之间是互斥的,接收操作之间也是互斥的。
- 发送操作和接收操作中对元素值的处理都是不可分割的。
- 发送操作在完全完成之前会被阻塞。接收操作也是如此。
下面是对三个特性详细的分析和说明:
(1)通道的第一个基本特性
在同一时刻,Go 语言的运行时系统(以下简称运行时系统)只会执行对同一个通道的任意个发送操作中的某一个。直到这个元素值被完全复制进该通道之后,其他针对该通道的发送操作才可能被执行。
类似的,在同一时刻,运行时系统也只会执行,对同一个通道的任意个接收操作中的某一个。直到这个元素值完全被移出该通道之后,其他针对该通道的接收操作才可能被执行。即使这些操作是并发执行的也是如此。
这里所谓的并发执行,可以这样认为,多个代码块分别在不同的 goroutine 之中,并有机会在同一个时间段内被执行。
另外,对于通道中的同一个元素值来说,发送操作和接收操作之间也是互斥的。例如,虽然会出现,正在被复制进通道但还未复制完成的元素值,但是这时它绝不会被想接收它的一方看到和取走。
这里要注意的一个细节是
,元素值从外界进入通道时会被复制。更具体地说,进入通道的并不是在接收操作符右边的那个元素值,而是它的副本。
另一方面,元素值从通道进入外界时会被移动。这个移动操作实际上包含了两步,
- 第一步是生成正在通道中的这个元素值的副本,并准备给到接收方;
- 第二步是删除在通道中的这个元素值;
(2)通道的第二个基本特性
这里的“不可分割”的意思是,它们处理元素值时都是一气呵成的,绝不会被打断。
- 例如,发送操作要么还没复制元素值,要么已经复制完毕,绝不会出现只复制了一部分的情况。
- 例如,接收操作在准备好元素值的副本之后,一定会删除掉通道中的原值,绝不会出现通道中仍有残留的情况。
这既是为了保证通道中元素值的完整性,也是为了保证通道操作的唯一性。对于通道中的同一个元素值来说,它只可能是某一个发送操作放入的,同时也只可能被某一个接收操作取出。
(3)通道的第三个基本特性
一般情况下,发送操作包括了“复制元素值”和“放置副本到通道内部”这两个步骤。在这两个步骤完全完成之前,发起这个发送操作的那句代码会一直阻塞在那里。也就是说,在它之后的代码不会有执行的机会,直到这句代码的阻塞解除。
更细致地说,在通道完成发送操作之后,运行时系统会通知这句代码所在的 goroutine,以使它去争取继续运行代码的机会。另外,接收操作通常包含了“复制通道内的元素值”“放置副本到接收方”“删掉原值”三个步骤。
在所有这些步骤完全完成之前,发起该操作的代码也会一直阻塞,直到该代码所在的 goroutine 收到了运行时系统的通知并重新获得运行机会为止。说到这里,可能已经感觉到,如此阻塞代码其实就是为了实现操作的互斥和元素值的完整。
三、通道的操作和使用
通道(channel)是用来传递数据的一个数据结构。
通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符<-
用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。
// 声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先创建:
ch := make(chan int)
ch <- v // 把 v 发送到通道 ch
v := <-ch // 从 ch 接收数据
// 并把值赋给 v
注意
:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。
3.1 通道缓冲区
通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:
ch := make(chan int, 100)
带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。
注意:
- 如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。
- 如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;
- 如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。
代码示例
package main
import "fmt"
func main() {
// 这里我们定义了一个可以存储整数类型的带缓冲通道
// 缓冲区大小为2
ch := make(chan int, 2)
// 因为 ch 是带缓冲的通道,我们可以同时发送两个数据
// 而不用立刻需要去同步读取数据
ch <- 1
ch <- 2
// 获取这两个数据
fmt.Println(<-ch)
fmt.Println(<-ch)
}
3.2 单向通道
在说“通道”的时候指的都是双向通道,即:既可以发也可以收的通道。所谓单向通道就是,只能发不能收,或者只能收不能发的通道。
一个通道是双向的,还是单向的是由它的类型字面量体现的。如果把接收操作符<-用在通道的类型字面量中,那么它代表的就不是“发送”或“接收”的动作了,而是表示通道的方向。
比如声明并初始化一个名叫uselessChan的变量,这个变量的类型是chan<- int,容量是1。
// 紧挨在关键字chan右边的那个<-,这表示了这个通道是单向的,并且只能发而不能收。
// 接收通道
var uselessChan = make(chan<- int, 1)
// 类似的如果这个操作符紧挨在chan的左边,那么就说明该通道只能收不能发
// 发送通道
var uselessChan = make(<-chan int, 1)
注意,与发送操作和接收操作对应,这里的“发”和“收”都是站在操作通道的代码的角度上说的。
从上述变量的名字上也能猜到,这样的通道是没用的。通道就是为了传递数据而存在的,声明一个只有一端(发送端或者接收端)能用的通道没有任何意义。那么,单向通道的用途究竟在哪呢?
3.3 通道遍历和关闭
Go 通过 range 关键字来实现遍历读取到的数据,类似于与数组或切片。格式如下:v, ok := <-ch
如果通道接收不到数据后 ok 就为 false,这时通道就可以使用 close() 函数来关闭。
package main
import (
"fmt"
)
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
// range 函数遍历每个从通道接收到的数据,因为 c 在发送完 10 个
// 数据之后就关闭了通道,所以这里我们 range 函数在接收到 10 个数据
// 之后就结束了。如果上面的 c 通道不关闭,那么 range 函数就不
// 会结束,从而在接收第 11 个数据的时候就阻塞了。
for i := range c {
fmt.Println(i)
}
}
四、通道实践的几大坑
在使用 channel 进行 goroutine 之间的通信时,有时候场面会变得十分复杂,以至于写出难以觉察、难以定位的偶现 bug,而且上线的时候往往跑得好好的,直到某一天深夜收到服务挂了、OOM 了之类的告警……
来梳理一下使用 channel 中常见的三大坑:panic、死锁、内存泄漏,做到防患于未然。
4.1 死锁
go 语言新手在编译时很容易碰到这个死锁的问题:
fatal error: all goroutines are asleep - deadlock!
这个就是喜闻乐见的「死锁」了…… 在操作系统中,学过「死锁」就是两个线程互相等待,耗在那里,最后程序不得不终止。
go 语言中的「死锁」也是类似的,两个 goroutine 互相等待,导致程序耗在那里,无法继续跑下去。看了很多死锁的案例后,channel 导致的死锁可以归纳为以下几类案例(先讨论 unbuffered channel 的情况)
4.1.1 只有生产者,没有消费者,或者反过来
channel 的生产者和消费者必须成对出现,如果缺乏一个,就会造成死锁,例如:
// 只有生产者,没有消费者
func f1() {
ch := make(chan int)
ch <- 1
}
// 只有消费者,没有生产者
func f2() {
ch := make(chan int)
<-ch
}
4.1.2 生产者和消费者出现在同一个 goroutine 中
除了需要成对出现,还需要出现在不同的 goroutine 中,例如:
// 同一个 goroutine 中同时出现生产者和消费者
func f3() {
ch := make(chan int)
ch <- 1 // 由于消费者还没执行到,这里会一直阻塞住
<-ch
}
对于 buffered channel 则是下面这种情况
4.1.3 buffered channel 已满,且在同一个goroutine中
buffered channel 会将收到的元素先存在 hchan 结构体的 ringbuffer 中,继而才会发生阻塞。而当发生阻塞时,如果阻塞了主 goroutine ,则也会出现死锁
所以实际使用中,推荐尽量使用 buffered channel ,使用起来会更安全,在下文的「内存泄漏」相关内容也会提及。
4.2 内存泄漏
内存泄漏一般都是通过 OOM(Out of Memory) 告警或者发布过程中对内存的观察发现的,服务内存往往都是缓慢上升,直到被系统 OOM 掉清空内存再周而复始。
在 go 语言中,错误地使用 channel 会导致 goroutine 泄漏,进而导致内存泄漏。
4.2.1 如何实现 goroutine 泄漏呢?
不会修 bug,还不会写 bug 吗?让 goroutine 泄漏的核心就是:
生产者/消费者 所在的 goroutine 已经退出,而其对应的 消费者/生产者 所在的 goroutine 会永远阻塞住,直到进程退出
4.2.2 生产者阻塞导致泄漏
一般会用 channel 来做一些超时控制,例如下面这个例子:
func leak1() {
ch := make(chan int)
// g1
go func() {
time.Sleep(2 * time.Second) // 模拟 io 操作
ch <- 100 // 模拟返回结果
}()
// g2
// 阻塞住,直到超时或返回
select {
case <-time.After(500 * time.Millisecond):
fmt.Println("timeout! exit...")
case result := <-ch:
fmt.Printf("result: %d
", result)
}
}
这里用 goroutine g1 来模拟 io 操作,主 goroutine g2 来模拟客户端的处理逻辑,
(1)假设客户端超时为 500ms,而实际请求耗时为 2s,则 select 会走到 timeout 的逻辑,这时 g2 退出,channel ch 没有消费者,会一直在等待状态,输出如下:
Goroutine num: 1
timeout! exit...
Goroutine num: 2
如果这是在 server 代码中,这个请求处理完后,g1 就会挂起、发生泄漏了,就等着 OOM 吧 。
(2)假设客户端超时调整为 5000ms,实际请求耗时 2s,则 select 会进入获取 result 的分支,输出如下:
Goroutine num: 1
result: 100
Goroutine num: 1
4.2.3 消费者阻塞导致泄漏
如果生产者不继续生产,消费者所在的 goroutine 也会阻塞住,不会退出,例如:
func leak2() {
ch := make(chan int)
// 消费者 g1
go func() {
for result := range ch {
fmt.Printf("result: %d
", result)
}
}()
// 生产者 g2
ch <- 1
ch <- 2
time.Sleep(time.Second) // 模拟耗时
fmt.Println("main goroutine g2 done...")
}
这种情况下,只需要增加 close(ch) 的操作即可,for-range 操作在收到 close 的信号后会退出、goroutine 不再阻塞,能够被回收。
4.2.4 如何预防内存泄漏
预防 goroutine 泄漏的核心就是:创建 goroutine 时就要想清楚它什么时候被回收。
具体到执行层面,包括:
- 当 goroutine 退出时,需要考虑它使用的 channel 有没有可能阻塞对应的生产者、消费者的 goroutine;
- 尽量使用 buffered channel使用 buffered channel 能减少阻塞发生、即使疏忽了一些极端情况,也能降低 goroutine 泄漏的概率;
4.3 panic
panic 就更刺激了,一般是测试的时候没发现,上线之后偶现,程序挂掉,服务出现一个超时毛刺后触发告警。channel 导致的 panic 一般是以下几个原因:
4.3.1 向已经 close 掉的 channel 继续发送数据
先举一个简单的栗子:
func p1() {
ch := make(chan int, 1)
close(ch)
ch <- 1
}
// panic: send on closed channel
在实际开发过程中,处理多个 goroutine 之间协作时,可能存在一个 goroutine 已经 close 掉 channel 了,另外一个不知道,也去 close 一下,就会 panic 掉,例如:
func p1() {
ch := make(chan int, 1)
done := make(chan struct{}, 1)
go func() {
<- time.After(2*time.Second)
println("close2")
close(ch)
close(done)
}()
go func() {
<- time.After(1*time.Second)
println("close1")
ch <- 1
close(ch)
}()
<-done
}
万恶之源就是在 go 语言里,是无法知道一个 channel 是否已经被 close 掉的,所以在尝试做 close 操作的时候,就应该做好会 panic 的准备……
4.3.2 多次 close 同一个 channel
同上,在尝试往 channel 里发送数据时,就应该考虑
- 这个 channel 已经关了吗?
- 这个 channel 什么时候、在哪个 goroutine 里关呢?
- 谁来关呢?还是干脆不关?
4.4 如何优雅地 close channel
4.4.1 需要检查 channel 是否关闭吗?
刚遇到上面说的 panic 问题时,也试过去找一个内置的 closed 函数来检查关闭状态,结果发现,并没有这样一个函数……
那么,如果有这样的函数,真能彻底解决 panic 的问题么?答案是不能。因为 channel 是在一个并发的环境下去做收发操作,就算当前执行 closed(ch) 得到的结果是 false,还是不能直接去关,例如代码:
if !closed(ch) { // 返回 false
// 在这中间出了幺蛾子!
close(ch) // 还是 panic 了……
}
遵循 less is more 的原则,这个 closed 函数是要不得了
4.4.2 需要 close 吗?为什么?
结论:除非必须关闭 chan,否则不要主动关闭。关闭 chan 最优雅的方式,就是不要关闭 chan~
当一个 chan 没有 sender 和 receiver 时,即不再被使用时,GC 会在一段时间后标记、清理掉这个 chan。那么什么时候必须关闭 chan 呢?
比较常见的是将 close 作为一种通知机制,尤其是生产者与消费者之间是 1:M 的关系时,通过 close 告诉下游:我收工了,你们别读了。
4.4.3 谁来关?
chan 关闭的原则:
- Don’t close a channel from the receiver side 不要在消费者端关闭 chan
- Don’t close a channel if the channel has multiple concurrent senders 有多个并发写的生产者时也别关
只要遵循这两条原则,就能避免两种 panic 的场景,即:向 closed chan 发送数据,或者是 close 一个 closed chan。
按照生产者和消费者的关系可以拆解成以下几类情况:
- 一写一读:生产者关闭即可
- 一写多读:生产者关闭即可,关闭时下游全部消费者都能收到通知
- 多写一读:多个生产者之间需要引入一个协调 channel 来处理信号
- 多写多读:与 3 类似,核心思路是引入一个中间层以及使用 try-send 的套路来处理非阻塞的写入.
代码示例
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh 是额外引入的一个信号 channel.
// 它的生产者是下面的 toStop channel,
// 消费者是上面 dataCh 的生产者和消费者
toStop := make(chan string, 1)
// toStop 是拿来关闭 stopCh 用的,由 dataCh 的生产者和消费者写入
// 由下面的匿名中介函数(moderator)消费
// 要注意,这个一定要是 buffered channel (否则没法用 try-send 来处理了)
var stoppedBy string
// moderator
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// try-send 操作
// 如果 toStop 满了,就会走 default 分支啥也不干,也不会阻塞
select {
case toStop <- "sender#" + id:
default:
}
return
}
// try-receive 操作,尽快退出
// 如果没有这一步,下面的 select 操作可能造成 panic
select {
case <- stopCh:
return
default:
}
// 如果尝试从 stopCh 取数据的同时,也尝试向 dataCh
// 写数据,则会命中 select 的伪随机逻辑,可能会写入数据
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// 同上
select {
case <- stopCh:
return
default:
}
// 尝试读数据
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
五、总结
Channel 是 Go 语言能够提供强大并发能力的原因之一,我们在这一节中分析了 Channel 的设计原理、数据结构以及发送数据、接收数据和关闭 Channel 这些基本操作,相信能够帮助大家更好地理解 Channel 的工作原理。
参考材料