您现在的位置是:首页 >其他 >go 并发/并行/协程/sync锁读写锁网站首页其他

go 并发/并行/协程/sync锁读写锁

Michaelwubo 2024-08-18 00:01:07
简介go 并发/并行/协程/sync锁读写锁

下面来介绍几个概念:

进程/线程:

  • 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
  • 线程是进程的一个执行实体,是 CPU 调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
  • 一个进程可以创建和撤销多个线程,同一个进程中的多个线程之间可以并发执行。

并发/并行

  • 多线程程序在单核心的 cpu 上运行,称为并发
  • 多线程程序在多核心的 cpu 上运行,称为并行
  • 并发与并行并不相同,并发主要由切换时间片来实现“同时”运行“
  • 并行则是直接利用多核实现多线程的运行,Go程序可以设置使用核心数,以发挥多核计算机的能力。

协程/线程

  • 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
  • 线程:一个线程上可以跑多个协程,协程是轻量级的线程。
  • 优雅的并发编程范式,完善的并发支持,出色的并发性能是Go语言区别于其他语言的一大特色。使用Go语言开发服务器程序时,就需要对它的并发机制有深入的了解。

Goroutine 介绍

  • goroutine(协程也就是任务) 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务,它是Go语言并发设计的核心。
  • 说到底 goroutine 其实就是线程,但是它比线程更小,十几个 goroutine 可能体现在底层就是五六个线程,而且Go语言内部也实现了 goroutine 之间的内存共享。
  • 使用 go 关键字就可以创建 goroutine,将 go 声明放到一个需调用的函数之前,在相同地址空间调用运行这个函数,这样该函数执行时便会作为一个独立的并发线程,这种线程在Go语言中则被称为 goroutine(协程)。

goroutine 的用法如下:

//go 关键字放在方法调用前新建一个 goroutine 并执行方法体
go GetThingDone(param1, param2);
//新建一个匿名方法并执行
go func(param1, param2) {
}(val1, val2)
//直接新建一个 goroutine 并在 goroutine 中执行代码块
go {
    //do someting...
}

因为 goroutine 在多核 cpu 环境下是并行的,如果代码块在多个 goroutine 中执行,那么我们就实现了代码的并行。

如果需要了解程序的执行情况,怎么拿到并行的结果呢?需要配合使用channel进行。

channel

channel 是Go语言在语言级别提供的 goroutine 间的通信方式。我们可以使用 channel 在两个或多个 goroutine 之间传递消息。

channel 是进程内的通信方式,因此通过 channel 传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,我们建议用分布式系统的方法来解决,比如使用 Socket 或者 HTTP 等通信协议。Go语言对于网络方面也有非常完善的支持。

channel 是类型相关的,也就是说,一个 channel 只能传递一种类型的值,这个类型需要在声明 channel 时指定。如果对 Unix 管道有所了解的话,就不难理解 channel,可以将其认为是一种类型安全的管道。

定义一个 channel 时,也需要定义发送到 channel 的值的类型,注意,必须使用 make 创建 channel,代码如下所示:

ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})

回到在 Windows 和 Linux 出现之前的古老年代,在开发程序时并没有并发的概念,因为命令式程序设计语言是以串行为基础的,程序会顺序执行每一条指令,整个程序只有一个执行上下文,即一个调用栈,一个堆。

并发则意味着程序在运行时有多个执行上下文,对应着多个调用栈。我们知道每一个进程在运行时,都有自己的调用栈和堆,有一个完整的上下文,而操作系统在调度进程的时候,会保存被调度进程的上下文环境,等该进程获得时间片后,再恢复该进程的上下文到系统中。
从整个操作系统层面来说,多个进程是可以并发的,那么并发的价值何在?下面我们先看以下几种场景。

1) 一方面我们需要灵敏响应的图形用户界面,一方面程序还需要执行大量的运算或者 IO 密集操作,而我们需要让界面响应与运算同时执行。
2) 当我们的 Web 服务器面对大量用户请求时,需要有更多的“Web 服务器工作单元”来分别响应用户。
3) 我们的事务处于分布式环境上,相同的工作单元在不同的计算机上处理着被分片的数据,计算机的 CPU 从单内核(core)向多内核发展,而我们的程序都是串行的,计算机硬件的能力没有得到发挥。
4) 我们的程序因为 IO 操作被阻塞,整个程序处于停滞状态,其他 IO 无关的任务无法执行。

从以上几个例子可以看到,串行程序在很多场景下无法满足我们的要求。下面我们归纳了并发程序的几条优点,让大家认识到并发势在必行:

  • 并发能更客观地表现问题模型;
  • 并发可以充分利用 CPU 核心的优势,提高程序的执行效率;
  • 并发能充分利用 CPU 与其他硬件设备固有的异步性。

在编写 Socket 网络程序时,需要提前准备一个线程池为每一个 Socket 的收发包分配一个线程。开发人员需要在线程数量和 CPU 数量间建立一个对应关系,以保证每个任务能及时地被分配到 CPU 上进行处理,同时避免多个任务频繁地在线程间切换执行而损失效率。
虽然,线程池为逻辑编写者提供了线程分配的抽象机制。但是,如果面对随时随地可能发生的并发和线程处理需求,线程池就不是非常直观和方便了。能否有一种机制:使用者分配足够多的任务,系统能自动帮助使用者把任务分配到 CPU 上,让这些任务尽量并发运作。这种机制在 Go语言中被称为 goroutine
goroutine 是 Go语言中的轻量级线程实现,由 Go 运行时(runtime)管理。Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。
Go 程序从 main 包的 main() 函数开始,在程序启动时,Go 程序就会为 main() 函数创建一个默认的 goroutine。

使用普通函数创建 goroutine

Go 程序中使用 go 关键字为一个函数创建一个 goroutine。一个函数可以被创建多个 goroutine,一个 goroutine 必定对应一个函数。

1) 格式

为一个普通函数创建 goroutine 的写法如下:

go 函数名( 参数列表 )

  • 函数名:要调用的函数名。
  • 参数列表:调用函数需要传入的参数。

使用 go 关键字创建 goroutine 时,被调用函数的返回值会被忽略。
如果需要在 goroutine 中返回数据,请使用后面介绍的通道(channel)特性,通过通道把数据从 goroutine 中作为返回值传出。

2) 例子

使用 go 关键字,将 running() 函数并发执行,每隔一秒打印一次计数器,而 main 的 goroutine 则等待用户输入,两个行为可以同时进行。请参考下面代码:

package main
import (
    "fmt"
    "time"
)
func running() {
    var times int
    // 构建一个无限循环
    for {
        times++
        fmt.Println("tick", times)
        // 延时1秒
        time.Sleep(time.Second)
    }
}
func main() {
    // 并发执行程序
    go running()
    // 接受命令行输入, 不做任何事情
    var input string
    fmt.Scanln(&input)
}

这个例子中,Go 程序在启动时,运行时(runtime)会默认为 main() 函数创建一个 goroutine。在 main() 函数的 goroutine 中执行到 go running 语句时,归属于 running() 函数的 goroutine 被创建,running() 函数开始在自己的 goroutine 中执行。此时,main() 继续执行,两个 goroutine 通过 Go 程序的调度机制同时运作。

使用匿名函数创建goroutine

go 关键字后也可以为匿名函数或闭包启动 goroutine。

1) 使用匿名函数创建goroutine的格式

使用匿名函数或闭包创建 goroutine 时,除了将函数定义部分写在 go 的后面之外,还需要加上匿名函数的调用参数,格式如下:

go func( 参数列表 ){
    函数体
}( 调用参数列表 )

其中:

  • 参数列表:函数体内的参数变量列表。
  • 函数体:匿名函数的代码。
  • 调用参数列表:启动 goroutine 时,需要向匿名函数传递的调用参数。

2) 使用匿名函数创建goroutine的例子

在 main() 函数中创建一个匿名函数并为匿名函数启动 goroutine。匿名函数没有参数。代码将并行执行定时打印计数的效果。参见下面的代码:

package main
import (
    "fmt"
    "time"
)
func main() {
    go func() {
        var times int
        for {
            times++
            fmt.Println("tick", times)
            time.Sleep(time.Second)
        }
    }()
    var input string
    fmt.Scanln(&input)
}

提示

所有 goroutine 在 main() 函数结束时会一同结束。

goroutine 虽然类似于线程概念,但是从调度性能上没有线程细致,而细致程度取决于 Go 程序的 goroutine 调度器的实现和运行环境。

终止 goroutine 的最好方法就是自然返回 goroutine 对应的函数。虽然可以用 golang.org/x/net/context 包进行 goroutine 生命期深度控制,但这种方法仍然处于内部试验阶段,并不是官方推荐的特性。

截止 Go 1.9 版本,暂时没有标准接口获取 goroutine 的 ID。

事实上,不管是什么平台,什么编程语言,不管在哪,并发都是一个大话题。并发编程的难度在于协调,而协调就要通过交流,从这个角度看来,并发单元间的通信是最大的问题。

在工程上,有两种最常见的并发通信模型:共享数据和消息

  • 共享数据是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。被共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。在实际工程应用中最常见的无疑是内存了,也就是常说的共享内存。
  • 代码案例如下
package main
import (
    "fmt"
    "runtime"
    "sync"
)
var counter int = 0
func Count(lock *sync.Mutex) {
    lock.Lock()
    counter++
    fmt.Println(counter)
    lock.Unlock()
}
func main() {
    lock := &sync.Mutex{}
    for i := 0; i < 10; i++ {
        go Count(lock)
    }
    for {
        lock.Lock()
        c := counter
        lock.Unlock()
        runtime.Gosched()
        if c >= 10 {
            break
        }
    }
}

在上面的例子中,我们在 10 个 goroutine 中共享了变量 counter。每个 goroutine 执行完成后,会将 counter 的值加 1。因为 10 个 goroutine 是并发执行的,所以我们还引入了锁,也就是代码中的 lock 变量。每次对 n 的操作,都要先将锁锁住,操作完成后,再将锁打开。
在 main 函数中,使用 for 循环来不断检查 counter 的值(同样需要加锁)。当其值达到 10 时,说明所有 goroutine 都执行完毕了,这时主函数返回,程序退出。
事情好像开始变得糟糕了。实现一个如此简单的功能,却写出如此臃肿而且难以理解的代码。想象一下,在一个大的系统中具有无数的锁、无数的共享变量、无数的业务逻辑与错误处理分支,那将是一场噩梦。这噩梦就是众多 C/C++开发者正在经历的,其实java 和C#开发者也好不到哪里去。
Go语言既然以并发编程作为语言的最核心优势,当然不至于将这样的问题用这么无奈的方式来解决。Go语言提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。

消息机制认为每个并发单元是自包含的、独立的个体,并且都有自己的变量,但在不同并发单元间这些变量不共享。每个并发单元的输入和输出只有一种,那就是消息。这有点类似于进程的概念,每个进程不会被其他进程打扰,它只做好自己的工作就可以了。不同进程间靠消息来通信,它们不会共享内存。
Go语言提供的消息通信机制被称为 channel.

有并发或并行,就有资源竞争,如果两个或者多个 goroutine 在没有相互同步的情况下,访问某个共享的资源,比如同时对该资源进行读写时,就会处于相互竞争的状态,这就是并发中的资源竞争

package main
import (
    "fmt"
    "runtime"
    "sync"
)
var (
    count int32
    wg    sync.WaitGroup
)
func main() {
    wg.Add(2)
    go incCount()
    go incCount()
    wg.Wait()
    fmt.Println(count)
}
func incCount() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
        value := count
        runtime.Gosched()
        value++
        count = value
    }
}

这是一个资源竞争的例子,大家可以将程序多运行几次,会发现结果可能是 2,也可以是 3,还可能是 4。这是因为 count 变量没有任何同步保护,所以两个 goroutine 都会对其进行读写,会导致对已经计算好的结果被覆盖,以至于产生错误结果。

代码中的 runtime.Gosched() 是让当前 goroutine 暂停的意思,退回执行队列,让其他等待的 goroutine 运行,目的是为了使资源竞争的结果更明显。

下面我们来分析一下程序的运行过程,将两个 goroutine 分别假设为 g1 和 g2:

  • g1 读取到 count 的值为 0;
  • 然后 g1 暂停了,切换到 g2 运行,g2 读取到 count 的值也为 0;
  • g2 暂停,切换到 g1,g1 对 count+1,count 的值变为 1;
  • g1 暂停,切换到 g2,g2 刚刚已经获取到值 0,对其 +1,最后赋值给 count,其结果还是 1;
  • 可以看出 g1 对 count+1 的结果被 g2 给覆盖了,两个 goroutine 都 +1 而结果还是 1。

通过上面的分析可以看出,之所以出现上面的问题,是因为两个 goroutine 相互覆盖结果。

所以我们对于同一个资源的读写必须是原子化的,也就是说,同一时间只能允许有一个 goroutine 对共享资源进行读写操作。

共享资源竞争的问题,非常复杂,并且难以察觉,好在 Go 为我们提供了一个工具帮助我们检查,这个就是go build -race 命令。在项目目录下执行这个命令,生成一个可以执行文件,然后再运行这个可执行文件,就可以看到打印出的检测信息。

[root@git jettech]# go build   -race  main.go 

[root@git jettech]# ./main 
==================
WARNING: DATA RACE
Read at 0x000000605860 by goroutine 8:
  main.incCount()
      /root/go/jettech/main.go:22 +0x7c

Previous write at 0x000000605860 by goroutine 7:
  main.incCount()
      /root/go/jettech/main.go:25 +0x9b

Goroutine 8 (running) created at:
  main.main()
      /root/go/jettech/main.go:14 +0x7c

Goroutine 7 (finished) created at:
  main.main()
      /root/go/jettech/main.go:13 +0x64
==================
4
Found 1 data race(s)

通过运行结果可以看出 goroutine 8 在代码 25 行读取共享资源value := count,而这时 goroutine 7 在代码 28 行修改共享资源count = value,而这两个 goroutine 都是从 main 函数的 16、17 行通过 go 关键字启动的。

锁住共享资源

Go语言提供了传统的同步 goroutine 的机制,就是对共享资源加锁。atomic 和 sync 包里的一些函数就可以对共享的资源进行加锁操作

原子函数

原子函数能够以很底层的加锁机制来同步访问整型变量和指针,示例代码如下所示:

package main
import (
        "fmt"
        "runtime"
        "sync"
        "sync/atomic"
)
var(
        counter int64
        wg sync.WaitGroup
)
func main() {
        wg.Add(2)
        go incCount()
        go incCount()
        wg.Wait()
        fmt.Println(counter)

}
func incCount() {
        defer wg.Done()
        for count:=0;count<2;count++ {
                atomic.AddInt64(&counter,1)
                runtime.Gosched()
        }
}

上述代码中使用了 atmoic 包的 AddInt64 函数,这个函数会同步整型值的加法,方法是强制同一时刻只能有一个 gorountie 运行并完成这个加法操作。当 goroutine 试图去调用任何原子函数时,这些 goroutine 都会自动根据所引用的变量做同步处理。

另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式。下面是代码就使用了 LoadInt64 和 StoreInt64 函数来创建一个同步标志,这个标志可以向程序里多个 goroutine 通知某个特殊状态。

package main
import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)
var (
    shutdown int64
    wg       sync.WaitGroup
)
func main() {
    wg.Add(2)
    go doWork("A")
    go doWork("B")
    time.Sleep(1 * time.Second)
    fmt.Println("Shutdown Now")
    atomic.StoreInt64(&shutdown, 1)
    wg.Wait()
}
func doWork(name string) {
    defer wg.Done()
    for {
        fmt.Printf("Doing %s Work
", name)
        time.Sleep(250 * time.Millisecond)
        if atomic.LoadInt64(&shutdown) == 1 {
            fmt.Printf("Shutting %s Down
", name)
            break
        }
    }
}

上面代码中 main 函数使用 StoreInt64 函数来安全地修改 shutdown 变量的值。如果哪个 doWork goroutine 试图在 main 函数调用 Stor

eInt64 的同时调用 LoadInt64 函数,那么原子函数会将这些调用互相同步,保证这些操作都是安全的,不会进入竞争状态

互斥锁

另一种同步访问共享资源的方式是使用互斥锁,互斥锁这个名字来自互斥的概念。互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以执行这个临界代码。

package main
import (
    "fmt"
    "runtime"
    "sync"
)
var (
    counter int64
    wg      sync.WaitGroup
    mutex   sync.Mutex
)
func main() {
    wg.Add(2)
    go incCounter(1)
    go incCounter(2)
    wg.Wait()
    fmt.Println(counter)
}
func incCounter(id int) {
    defer wg.Done()
    for count := 0; count < 2; count++ {
        //同一时刻只允许一个goroutine进入这个临界区
        mutex.Lock()
        {
            value := counter
            runtime.Gosched()
            value++
            counter = value
        }
        mutex.Unlock() //释放锁,允许其他正在等待的goroutine进入临界区
    }
}

同一时刻只有一个 goroutine 可以进入临界区。之后直到调用 Unlock 函数之后,其他 goroutine 才能进去临界区。当调用 runtime.Gosched 函数强制将当前 goroutine 退出当前线程后,调度器会再次分配这个 goroutine 继续运行。

在 Go语言程序运行时(runtime)实现了一个小型的任务调度器。这套调度器的工作原理类似于操作系统调度线程,Go 程序调度器可以高效地将 CPU 资源分配给每一个任务。传统逻辑中,开发者需要维护线程池中线程与 CPU 核心数量的对应关系。同样的,Go 地中也可以通过 runtime.GOMAXPROCS() 函数做到,格式为:

runtime.GOMAXPROCS(逻辑CPU数量)

这里的逻辑CPU数量可以有如下几种数值:

  • <1:不修改任何数值。
  • =1:单核心执行。
  • >1:多核并发执行。

一般情况下,可以使用 runtime.NumCPU() 查询 CPU 数量,并使用 runtime.GOMAXPROCS() 函数进行设置,例如:

  1. runtime.GOMAXPROCS(runtime.NumCPU())

Go 1.5 版本之前,默认使用的是单核心执行。从 Go 1.5 版本开始,默认执行上面语句以便让代码并发执行,最大效率地利用 CPU。

GOMAXPROCS 同时也是一个环境变量,在应用程序启动前设置环境变量也可以起到相同的作用。

在讲解并发概念时,总会涉及另外一个概念并行。下面让我们来了解并发和并行之间的区别。

  • 并发(concurrency):把任务在不同的时间点交给处理器进行处理。在同一时间点,任务并不会同时运行。
  • 并行(parallelism):把每一个任务分配给每一个处理器独立完成。在同一时间点,任务一定是同时运行。

并发不是并行。并行是让不同的代码片段同时在不同的物理处理器上执行。并行的关键是同时做很多事情,而并发是指同时管理很多事情,这些事情可能只做了一半就被暂停去做别的事情了。
在很多情况下,并发的效果比并行好,因为操作系统和硬件的总资源一般很少,但能支持系统同时做很多事情。这种“使用较少的资源做更多的事情”的哲学,也是指导 Go语言设计的哲学。
如果希望让 goroutine 并行,必须使用多于一个逻辑处理器。当有多个逻辑处理器时,调度器会将 goroutine 平等分配到每个逻辑处理器上。这会让 goroutine 在不同的线程上运行。不过要想真的实现并行的效果,用户需要让自己的程序运行在有多个物理处理器的机器上。否则,哪怕 Go语言运行时使用多个线程,goroutine 依然会在同一个物理处理器上并发运行,达不到并行的效果。
下图展示了在一个逻辑处理器上并发运行 goroutine 和在两个逻辑处理器上并行运行两个并发的 goroutine 之间的区别。调度器包含一些聪明的算法,这些算法会随着 Go语言的发布被更新和改进,所以不推荐盲目修改语言运行时对逻辑处理器的默认设置。如果真的认为修改逻辑处理器的数量可以改进性能,也可以对语言运行时的参数进行细微调整。

Go语言在 GOMAXPROCS 数量与任务数量相等时,可以做到并行执行,但一般情况下都是并发执行。

channelle

如果说 goroutine 是 Go语言程序的并发体的话,那么 channels 就是它们之间的通信机制。一个 channels 是一个通信机制,它可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int。

Go语言提倡使用通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
这里通信的方法就是使用通道(channel),如下图所示。

                                图:goroutine 与 channel 的通信

在地铁站、食堂、洗手间等公共场所人很多的情况下,大家养成了排队的习惯,目的也是避免拥挤、插队导致的低效的资源使用和交换过程。代码与数据也是如此,多个 goroutine 为了争抢数据,势必造成执行的低效率,使用队列的方式是最高效的,channel 就是一种队列一样的结构。

通道的特性

Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。

声明通道类型

通道本身需要一个类型进行修饰,就像切片类型需要标识元素类型。通道的元素类型就是在其内部传输的数据类型,声明如下:

var 通道变量 chan 通道类型

  • 通道类型:通道内的数据类型。
  • 通道变量:保存通道的变量。

chan 类型的空值是 nil,声明后需要配合 make 后才能使用。

创建通道

通道是引用类型,需要使用 make 进行创建,格式如下:

通道实例 := make(chan 数据类型)

  • 数据类型:通道内传输的元素类型。
  • 通道实例:通过make创建的通道句柄。


请看下面的例子:

ch1 := make(chan int)                 // 创建一个整型类型的通道
ch2 := make(chan interface{})         // 创建一个空接口类型的通道, 可以存放任意格式
type Equip struct{ /* 一些字段 */ }
ch2 := make(chan *Equip)             // 创建Equip指针类型的通道, 可以存放*Equip

使用通道发送数据

通道创建后,就可以使用通道进行发送和接收操作。

1) 通道发送数据的格式

通道的发送使用特殊的操作符<-,将数据通过通道发送的格式为:

通道变量 <- 值

  • 通道变量:通过make创建好的通道实例。
  • 值:可以是变量、常量、表达式或者函数返回值等。值的类型必须与ch通道的元素类型一致。

2) 通过通道发送数据的例子

使用 make 创建一个通道后,就可以使用<-向通道发送数据,代码如下:

// 创建一个空接口通道
ch := make(chan interface{})
// 将0放入通道中
ch <- 0
// 将hello字符串放入通道中
ch <- "hello"

3) 发送将持续阻塞直到数据被接收

把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,代码如下:

package main
func main() {
    // 创建一个整型通道
    ch := make(chan int)
    // 尝试将0通过通道发送
    ch <- 0
}

报错的意思是:运行时发现所有的 goroutine(包括main)都处于等待 goroutine。也就是说所有 goroutine 中的 channel 并没有形成发送和接收对应的代码。

使用通道接收数据

通道接收同样使用<-操作符,通道接收有如下特性:
① 通道的收发操作在不同的两个 goroutine 间进行。

由于通道的数据在没有接收方处理时,数据发送方会持续阻塞,因此通道的接收必定在另外一个 goroutine 中进行。

② 接收将持续阻塞直到发送方发送数据。

如果接收方接收时,通道中没有发送方发送数据,接收方也会发生阻塞,直到发送方发送数据为止。

③ 每次接收一个元素。
通道一次只能接收一个数据元素。

通道的数据接收一共有以下 4 种写法。

通道的数据接收一共有以下 4 种写法。

1) 阻塞接收数据

阻塞模式接收数据时,将接收变量作为<-操作符的左值,格式如下:

data := <-ch

执行该语句时将会阻塞,直到接收到数据并赋值给 data 变量。

2) 非阻塞接收数据

使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下:

data, ok := <-ch

  • data:表示接收到的数据。未接收到数据时,data 为通道类型的零值。
  • ok:表示是否接收到数据。


非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计时器 channel 进行,可以参见后面的内容。

3) 接收任意数据,忽略接收的数据

阻塞接收数据后,忽略从通道返回的数据,格式如下:

<-ch

执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。

使用通道做并发同步的写法,可以参考下面的例子:

package main
import (
    "fmt"
)
func main() {
    // 构建一个通道
    ch := make(chan int)
    // 开启一个并发匿名函数
    go func() {
        fmt.Println("start goroutine")
        // 通过通道通知main的goroutine
        ch <- 0
        fmt.Println("exit goroutine")
    }()
    fmt.Println("wait goroutine")
    // 等待匿名goroutine
    <-ch
    fmt.Println("all done")
}

代码说明如下:

  • 第 10 行,构建一个同步用的通道。
  • 第 13 行,开启一个匿名函数的并发。
  • 第 18 行,匿名 goroutine 即将结束时,通过通道通知 main 的 goroutine,这一句会一直阻塞直到 main 的 goroutine 接收为止。
  • 第 27 行,开启 goroutine 后,马上通过通道等待匿名 goroutine 结束。

4) 循环接收

通道的数据接收可以借用 for range 语句进行多个元素的接收操作,格式如下:

 
  1. for data := range ch {
  2. }

通道 ch 是可以进行遍历的,遍历的结果就是接收到的数据。数据类型就是通道的数据类型。通过 for 遍历获得的变量只有一个,即上面例子中的 data。

遍历通道数据的例子请参考下面的代码。

使用 for 从通道中接收数据:

package main
import (
    "fmt"
    "time"
)
func main() {
    // 构建一个通道
    ch := make(chan int)
    // 开启一个并发匿名函数
    go func() {
        // 从3循环到0
        for i := 3; i >= 0; i-- {
            // 发送3到0之间的数值
            ch <- i
            // 每次发送完时等待
            time.Sleep(time.Second)
        }
    }()
    // 遍历接收通道数据
    for data := range ch {
        // 打印通道数据
        fmt.Println(data)
        // 当遇到数据0时, 退出接收循环
        if data == 0 {
                break
        }
    }
}

代码说明如下:

  • 第 12 行,通过 make 生成一个整型元素的通道。
  • 第 15 行,将匿名函数并发执行。
  • 第 18 行,用循环生成 3 到 0 之间的数值。
  • 第 21 行,将 3 到 0 之间的数值依次发送到通道 ch 中。
  • 第 24 行,每次发送后暂停 1 秒。
  • 第 30 行,使用 for 从通道中接收数据。
  • 第 33 行,将接收到的数据打印出来。
  • 第 36 行,当接收到数值 0 时,停止接收。如果继续发送,由于接收 goroutine 已经退出,没有 goroutine 发送到通道,因此运行时将会触发宕机报错。

前面的例子创建的都是无缓冲通道。使用无缓冲通道往里面装入数据时,装入方将被阻塞,直到另外通道在另外一个 goroutine 中被取出。同样,如果通道中没有放入任何数据,接收方试图从通道中获取数据时,同样也是阻塞。发送和接收的操作是同步完成的

下面通过一个并发打印的例子,将 goroutine 和 channel 放在一起展示它们的用法。

package main
import (
    "fmt"
)
func printer(c chan int) {
    // 开始无限循环等待数据
    for {
        // 从channel中获取一个数据
        data := <-c
        // 将0视为数据结束
        if data == 0 {
            break
        }
        // 打印数据
        fmt.Println(data)
    }
    // 通知main已经结束循环(我搞定了!)
    c <- 0
}
func main() {
    // 创建一个channel
    c := make(chan int)
    // 并发执行printer, 传入channel
    go printer(c)
    for i := 1; i <= 10; i++ {
        // 将数据通过channel投送给printer
        c <- i
    }
    // 通知并发的printer结束循环(没数据啦!)
    c <- 0
    // 等待printer结束(搞定喊我!)
    <-c
}

代码说明如下:

  • 第 10 行,创建一个无限循环,只有当第 16 行获取到的数据为 0 时才会退出循环。
  • 第 13 行,从函数参数传入的通道中获取一个整型数值。
  • 第 21 行,打印整型数值。
  • 第 25 行,在退出循环时,通过通道通知 main() 函数已经完成工作。
  • 第 32 行,创建一个整型通道进行跨 goroutine 的通信。
  • 第 35 行,创建一个 goroutine,并发执行 printer() 函数。
  • 第 37 行,构建一个数值循环,将 1~10 的数通过通道传送给 printer 构造出的 goroutine。
  • 第 44 行,给通道传入一个 0,表示将前面的数据处理完成后,退出循环。
  • 第 47 行,在数据发送过去后,因为并发和调度的原因,任务会并发执行。这里需要等待 printer 的第 25 行返回数据后,才可以退出 main()。

本例的设计模式就是典型的生产者和消费者。生产者是第 37 行的循环,而消费者是 printer() 函数。整个例子使用了两个 goroutine,一个是 main(),一个是通过第 35 行 printer() 函数创建的 goroutine。两个 goroutine 通过第 32 行创建的通道进行通信。这个通道有下面两重功能。

  • 数据传送:第 40 行中发送数据和第 13 行接收数据。
  • 控制指令:类似于信号量的功能。同步 goroutine 的操作。功能简单描述为:
    • 第 44 行:“没数据啦!”
    • 第 25 行:“我搞定了!”
    • 第 47 行:“搞定喊我!”

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。