您现在的位置是:首页 >学无止境 >Golang Channel 实现原理与源码分析网站首页学无止境

Golang Channel 实现原理与源码分析

Pistachiout 2024-07-20 00:01:02
简介Golang Channel 实现原理与源码分析

Do not communicate by sharing memory; instead, share memory by communicating.
通过通信来共享内存,而不是共享内存来通信

安全访问共享变量是并发编程的一个难点,在 Golang 语言中,倡导通过通信共享内存,实际上就是使用 channel 传递共享变量,在任何给定时间,只有一个 goroutine 可以访问该变量的值,从而避免发生数据竞争。
本文关键是对Channel 实现原理进行分析,并附带源码解读,基于源码分析能更加理解Channel实现的过程与原因,对于源码关键步骤及变量给出了注释,不需要完全读懂源码的每个变量及函数,但可以从代码的异常处理角度来理解Channel,就能明白为什么channel的创建、写入、读取、关闭等流程需要分为多种情况。

1.Channel 数据结构

1.1 hchan结构体

读 channel 的源码,可以发现 channel 的数据结构是 hchan 结构体,包含以下字段,每个字段的含义已注释:

type hchan struct {
 qcount   uint           // 当前 channel 中存在多少个元素;
 dataqsiz uint           // 当前 channel 能存放的元素容量;
 buf      unsafe.Pointer // channel 中用于存放元素的环形缓冲区;
 elemsize uint16        //channel 元素类型的大小;
 closed   uint32		//标识 channel 是否关闭;
 elemtype *_type 		// channel 元素类型;
 sendx    uint   		// 发送元素进入环形缓冲区的 index;
 recvx    uint   		// 接收元素所处的环形缓冲区的 index;
 recvq    waitq  		// 因接收而陷入阻塞的协程队列;
 sendq    waitq  		// 因发送而陷入阻塞的协程队列;
 lock mutex				//互斥锁,保证同一时间只有一个协程读写 channel
}

通过阅读 channel 的数据结构,可以发现 channel 是使用环形队列作为 channel 的缓冲区,datasize 环形队列的长度是在创建 channel 时指定的,通过 sendx 和 recvx 两个字段分别表示环形队列的队尾和队首,其中,sendx 表示数据写入的位置,recvx 表示数据读取的位置。

字段 recvq 和 sendq 分别表示等待接收的协程队列和等待发送的协程队列,当 channel 缓冲区为空或无缓冲区时,当前协程会被阻塞,分别加入到 recvq 和 sendq 协程队列中,等待其它协程操作 channel 时被唤醒。其中,读阻塞的协程被写协程唤醒,写阻塞的协程被读协程唤醒。

字段 elemtype 和 elemsize 表示 channel 中元素的类型和大小,需要注意的是,一个 channel 只能传递一种类型的值,如果需要传递任意类型的数据,可以使用 interface{} 类型。

字段 lock 是保证同一时间只有一个协程读写 channel。
在这里插入图片描述

1.2 阻塞协程队列waitq与sudog结构体

在hchan中我们可以看到 recvq与sendq都是waitq类型,这代表协程等待队列。这个队列维护阻塞在一个channel上的所有协程。first和last是指向sudog结构体类型的指针,表示队列的头和尾。waitq里面连接的是一个sudog双向链表,保存的是等待的goroutine。队列中的sudog也是一个结构体,代表一个协程/sync.Mutex等待队列中的节点,包含了协程和数据的信息。waitq与sudog结构体包含以下字段,每个字段的含义已注释:

type waitq struct {		//阻塞的协程队列
    first *sudog 		//队列头部
    last  *sudog		//队列尾部
}
type sudog struct {		//sudog:包装协程的节点
    g *g				//goroutine,协程;

    next *sudog			//队列中的下一个节点;
    prev *sudog			//队列中的前一个节点;
    elem unsafe.Pointer //读取/写入 channel 的数据的容器;
    
    isSelect bool		//标识当前协程是否处在 select 多路复用的流程中;
    
    c        *hchan 	//标识与当前 sudog 交互的 chan.
}

在这里插入图片描述

2.Channel构造器函数

2.1 Channel常见类型

  • 无缓冲型Channel:常用于同步的场景,比如协调两个或多个并发goroutine之间的执行,传递临界资源等。

  • 有缓冲的 struct 型Channel:常用于单向传输数据流,例如将producer和consumer分开,这样可以避免不必要的等待时间。

  • 有缓冲的 pointer 型Channel:有缓冲的 pointer 型Channel位于管道中的元素是指针类型的变量。它常被用于异步数据传输,将消费者的读取数据和生产者的填充数据分离。

2.2 Channel构造器函数源码分析

func makechan(t *chantype, size int) *hchan {
    elem := t.elem	//Channel中元素类型
    
    // 每个元素的内存大小为elem.size,channel的容量为size,计算出总内存mem
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:				//无缓冲型Channel
   		 //hchanSize默认为96
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // 竞争检测器使用此位置进行同步。
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:		//有缓冲的 struct 型Channel
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:					//有缓冲的 pointer 型Channel
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    // 初始化hchan
    c.elemsize = uint16(elem.size)		//每个元素在内存中占用的字节数
    c.elemtype = elem					//元素类型
    c.dataqsiz = uint(size)				//队列中元素的数量上限
    
    lockInit(&c.lock, lockRankHchan)	//初始化读写保护锁

    return c
}

这段代码的作用是创建一个 channel,并初始化 channel 中的各个字段。

  1. 计算总内存大小:每个元素占用空间是t.elem.size,channel的容量是size,所需要分配的总内存大小为mem
  2. 根据mem的值判断是否需要分配内存:分为 无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel;
    • 倘若为无缓冲型channel,则仅申请一个大小为默认值 hchanSize即96 的空间;
    • 如若有缓冲的 struct 型channel,则一次性分配好 96 + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置;
    • 倘若为有缓冲的 pointer 型channel,则分别申请 chan 和 buf 的空间,两者无需连续;
  3. 初始化channel:设置elemsize, elemtype, dataqsiz, lock等字段。其中elemsize标识每个元素在内存中占用的字节数,elemType包含元素类型(reflect.Type),dataqsiz存放队列中元素的数量上限(若是无缓冲通道,则默认为1), lock压缩对chan的读写操作进行保护的锁。
  4. 最后返回创建的channel的指针c。

3.channel写操作实现原理

3.1 channel写异常处理

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    
    // ...
  • 对于未初始化即为空的 chan,写入操作会引发死锁“unreachable”;
  • 对于已关闭的 chan,写入操作会引发 panic"send on closed channel";

3.2 channel写时存在阻塞读协程——此时环形缓冲区内元素个数为0

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
	
    lock(&c.lock)	// 加锁

    // ...
	//从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog
    if sg := c.recvq.dequeue(); sg != nil {
		//在 send 方法中,基于 memmove 方法,直接将元素拷贝交给 sudog 对应的读协程sg,并完成解锁动作
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    
    // ...

写入前利用channel 的lock进行加锁,如果在channel写入时,如果 channel 中存在阻塞的读协程,那么此时channel内一定没有元素,于是将这个读携程 唤醒,并为了提高效率,直接将要发送的数据传递给它,而不需要存储到缓冲区中。
在这里插入图片描述

3.3channel写时无阻塞读协程且环形缓冲区仍有空间

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)	//加锁
    // ...
    if c.qcount < c.dataqsiz {	//判断环形缓冲区是否有空间
        qp := chanbuf(c, c.sendx)	//将当前元素添加到环形缓冲区 sendx 对应的位置
        //memmove(dst, src, t.size) 进行数据的转移,本质上是一个内存拷贝
        //将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // ...
}

写入前利用channel 的lock进行加锁,若channel写时无阻塞读协程且环形缓冲区仍有空间,则此时可以直接写入channel中,即直接将当前元素添加到环形缓冲区 sendx 对应的位置,并sendx++,qcount++并解锁,返回。
在这里插入图片描述

3.4 channel写时无阻塞读协程但环形缓冲区无空间

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)	//加锁

    // ...
    //构造封装当前 goroutine 的 sudog 对象,建立 sudog、goroutine、channel 之间的指向关系
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    //把 sudog 添加到当前 channel 的阻塞写协程队列中
    c.sendq.enqueue(mysg)
    
    //park 当前协程
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    //倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走)
    gp.waiting = nil
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

写入前利用channel 的lock进行加锁,若channel写时无阻塞读协程且环形缓冲区无空间,则此时不能写入缓冲区,需要将当前协程加入阻塞写协程队列中,等待被读协程唤醒。在被唤醒时对应的元素必然已经被读协程取走(具体可以看下一章读流程:读时有阻塞的写协程),故可直接清除占用空间。在这里插入图片描述

3.5 channel写流程总结

在这里插入图片描述

  1. 首先判断通道是否为nil即未初始化,若为空则引发死锁
  2. 若通道非空,由于channel是共享资源,故需要对通道进行lock加锁
  3. 继续判断通道是否关闭,若关闭,则引发panic:send on closed channel
  4. 通道非空未关闭,则正式进入写入流程,首先判断是否有阻塞的读协程
    • 若有阻塞的读协程,此时环形缓冲区内元素个数为0, 则唤醒读携程,直接将要发送的数据传递给它,并完成写入,进行解锁返回
    • 没有阻塞的读协程,则判断环形缓冲区是否有空间
      • 若环形缓冲区有空间,则直接将当前元素添加到环形缓冲区 sendx的位置,并更新写入位置sendx与通道元素个数qcount,解锁后返回函数。
      • 若环形缓冲区无空间,将当前协程加入阻塞写协程队列中,阻塞协程,等待被读协程唤醒,并完成解锁

4. channel读操作实现原理

4.1 channel读异常处理:读空 channel

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // ...
}

如上所示,若想要读一个没有初始化的空channel,调用 runtime.gopark 挂起当前 Goroutine,引起死锁"unreachable";

4.2 读时channel已关闭且环形缓冲区内部无元素

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  
    lock(&c.lock)
 // ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
            //typedmemclr(ptr, size):从 ptr 开始的地址上清空 size 字节的数据,将要清空的内存空间设置成数据类型的零值。
            	// Channel 已经关闭并且缓冲区没有任何数据,返回c.elemtype的零值
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } 

    // ...

如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接解锁返回零值。
对于 Channel 已经关闭但缓冲区有数据的处理会在后续判断中进行。

4.3 读时有阻塞的写协程——环形缓冲区为无缓冲型或已被写满


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   	//加锁;
    lock(&c.lock)
	 // ...
	 //从阻塞写协程队列中获取到一个写协程
    if sg := c.sendq.dequeue(); sg != nil {
		//从发送队列中出队一个 sg,并通过 recv 函数将 sg 中的数据写入到接收端点 ep 中
		//recv函数内部会进行大量处理:
		//若 channel为无缓冲型,则直接读取写协程元素,并唤醒写协程;
		//若 channel 为有缓冲型,则读取缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写协程,更新读写索引;
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
     }
     // ...
}

读时如果有阻塞的写协程,那么环形缓冲区一定为无缓冲型或已被写满,此时调用recv函数,调用后结果如下:

  • 倘若 channel为无缓冲型,则直接读取写协程元素,并唤醒写协程;
  • 倘若 channel 为有缓冲型,则读取缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写协程,更新读写索引;

recv函数大致流程:
1.如果 sudog 指针 sg 为 nil,则说明当前接收操作没有目标元素,这种情况通常发生在 select 中的非阻塞接收操作或 buffered channel 的读取操作中。
2.如果 channel 中有缓冲数据或者存在未处理的发送操作,则直接将数据从 channel 的缓冲区或发送队列中取出,并将其写入该 sudog 内指定的目标内存地址中。
3.如果 channel 中没有缓冲数据且不存在未处理的发送操作,则创建员工新的 sudog 结构体,将接收请求加入到链表中,同时调度当前 goroutine 进入睡眠状态,等待其他 goroutine 的发送操作唤醒。
4.当唤醒时,检查发送队列中是否有匹配这个接收端点的发送端点:若是,则从发送端点中获取目标元素,将其写入到指定的目标内存处,然后解除所有阻塞并返回;若否,则继续睡眠,等待其他发送操作的唤醒。

在这里插入图片描述

4.4 读时无阻塞写协程且缓冲区有元素

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // ...
    //加锁;
    lock(&c.lock)
    // ...
    if c.qcount > 0 {
        // 获取到 recvx 对应位置的元素
        qp := chanbuf(c, c.recvx)
        if ep != nil {
        	//typedmemmove(dst, src, size):从 src 指向的地址复制 size 字节的数据到 dst 指向的地址。
        	//将channel缓冲区或发送队列中读取到的目标元素(即 qp 指针)写入到接收端点的目标内存地址(即 ep 指针)中
            typedmemmove(c.elemtype, ep, qp)
        }
        //typedmemclr(ptr, size):从 ptr 开始的地址上清空 size 字节的数据,将要清空的内存空间设置成数据类型的零值。
        //清空刚才从 channel 缓冲区或发送队列中取出的元素
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    // ...

读时无阻塞写协程且缓冲区有元素,为一般情况,则直接读取环形缓冲区对应的元素
在这里插入图片描述

4.5 读时无阻塞写协程且缓冲区无元素

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // ...
   //加锁
   lock(&c.lock)
   // ...
   //构造封装当前 goroutine 的 sudog 对象
    gp := getg()
    mysg := acquireSudog()
    //完成指针指向,建立 sudog、goroutine、channel 之间的指向关系
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    //把 sudog 添加到当前 channel 的阻塞读协程队列中
    c.recvq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
     //park 挂起当前读协程
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
	//倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被写入)
    gp.waiting = nil
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    //解锁,返回
    return true, success
}

读时无阻塞写协程且缓冲区无元素,那么直接通过 gopark 函数,将当前 goroutine 驱动进入休眠状态,等待其他 写goroutine push 数据、close channel 或者 delete 当前 goroutine 的唤醒,被唤醒后数据已被其他协程处理,故直接回收空间。

4.6 channel读流程总结

在这里插入图片描述

  1. 首先判断通道是否为nil即未初始化,若为空则引发死锁
  2. 若通道非空,由于channel是共享资源,故需要对通道进行lock加锁
  3. 继续判断通道是否关闭,若关闭,则判断环形缓冲区是否有元素,若无元素,则返回对应元素的零值。
  4. 通道非空未关闭,则正式进入写入流程,首先判断是否有阻塞的写协程
    • 若有阻塞的写协程, 说明环形缓冲区为无缓冲型或已被写满,故判断channel是否为无缓冲型
      • 若 channel为无缓冲型,则直接读取写协程元素,并唤醒写协程;
      • 若 channel 为有缓冲型,则读取环形缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写协程,更新读写索引;
    • 若没有阻塞的写协程,则判断环形缓冲区是否有空间
      • 若环形缓冲区有空间,则直接将当前元素添加到环形缓冲区 sendx的位置,并更新写入位置sendx与通道元素个数qcount,解锁后返回函数。
      • 若环形缓冲区无空间,将当前协程加入阻塞写协程队列中,阻塞协程,等待被读协程唤醒,并完成解锁

4.7 两种读 channel 的协议

读取 channel 时,我们会发现若通道关闭且无元素会返回零值,故我们需要判断进行读channel时是真的读到零值还是由于通道关闭读到零值,故源码中定义了两种读 channel 的协议。分别如下:

got1 := <- ch
got2,ok := <- ch

根据第二个 bool 型的返回值用以判断当前 channel 是否已处于关闭状态,若ok为false,则说明通道已经关闭并且缓冲区为空。
在两种格式下,读 channel 操作会被汇编成不同的方法:

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

5.阻塞与非阻塞模式

5.1 阻塞与非阻塞模式概述

阻塞和非阻塞是指在访问资源时等待结果的两种方式,它们之间的主要区别在于在等待调用完成返回时,程序是否能继续执行其他操作。

  • 阻塞是指当进程请求一个 I/O 操作时(比如读或写磁盘文件),如果该设备还没有准备好读写数据,则调用的进程将被挂起并且继续排队等待,直到读或写操作成功完成。阻塞操作会一直占用进程资源,直到得到所需要的结果为止。

  • 而非阻塞调用的作用和阻塞调用的结果是完全相同的,执行后立即返回一个状态码来表示操作的成功或失败。如果操作不能立即执行,则不会等待,而是返回失败并告诉应用程序可以稍后再次尝试。

channel 操作默认情况下是阻塞的,这意味着当执行 <- channel 读取或者 channel <- value 写入语句时,程序会一直等待,直到某个 goroutine 从该 channel 接收到数据或者有其他 goroutine 将数据发送到该通道中。
而在使用 select 语句的时候,默认的行为是非阻塞的,即当所有分支都无法立刻执行时,select 会立即返回,而不是阻塞等待,这样就给了我们利用分支的互斥性和阻塞逻辑设计非阻塞 IO 的能力。

ch := make(chan int)
select{
  case <- ch:
  default:
}

5.2 非阻塞模式逻辑

在上述源码解读时,可以看到写操作函数chansend与读操作函数chanrecv都有一个参数:block bool,不过在源码中进行了精简,故对于block 的作用没有体现。
非阻塞模式下,在读/写 channel 方法都会通过这个 bool 型的响应参数block ,用以标识是否读取/写入成功.
• 能立即完成读取/写入操作的条件下,非阻塞模式下会返回 true.
• 使得当前 goroutine 进入死锁或需要被挂起的操作,在非阻塞模式下会返回 false;

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
    return chanrecv(c, elem, false)
}

在 select 语句包裹的多路复用分支中,读和写 channel 操作会被汇编为 selectnbrecv 和 selectnbsend 方法,底层同样复用 chanrecv 和 chansend 方法,但此时由于第三个入参 block 被设置为 false,导致后续会走进非阻塞的处理分支.

6.关闭channel流程

func closechan(c *hchan) {
    if c == nil {		//关闭未初始化过的 channel 会 panic;
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)//加锁
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))//重复关闭 channel 会 panic
    }

    c.closed = 1

    var glist gList
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        glist.push(gp)		//将阻塞读协程队列中的协程节点统一添加到 glist
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        glist.push(gp)			//将阻塞写协程队列中的协程节点统一添加到 glist
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)	// 唤醒 glist 当中的所有协程

在这里插入图片描述

  1. 首先判断通道是否为nil即未初始化,若关闭空channel则引发panic(plainError(“close of nil channel”))
  2. 若通道非空,由于channel是共享资源,故需要对通道进行lock加锁
  3. 继续判断通道是否关闭,若已经关闭,则引发panic(plainError(“close of closed channel”))
  4. 通道非空未关闭,则正式进入关闭流程:
    • 若有阻塞读协程队列,则将阻塞读协程队列中的协程节点统一添加到 glist,此时一定无阻塞写协程队列
    • 若有阻塞写协程队列,则将阻塞写协程队列中的协程节点统一添加到 glist,此时一定无阻塞读协程队列
    • 唤醒 glist 当中的所有协程.
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。