您现在的位置是:首页 >学无止境 >Go语言通过chan进行数据传递网站首页学无止境

Go语言通过chan进行数据传递

242030 2024-10-12 12:01:03
简介Go语言通过chan进行数据传递

Go语言通过chan进行数据传递

1、并发范式-管道

通道可以分为两个方向,一个是读,另一个是写,假如一个函数的输入参数和输出参数都是相同的 chan 类型,则

该函数可以调用自己,最终形成一个调用链。当然多个具有相同参数类型的函数也能组成一个调用链,这很像

UNIX系统的管道,是一个有类型的管道。下面通过具体的示例演示Go程序这种链式处理能力。

package main

import (
	"fmt"
)

// chain函数输入参数和输入参数类型相同都是chan int类型
// chain函数功能是将chan内的数据统一加1
func chain(in chan int) chan int {
	out := make(chan int)
	go func() {
		for v := range in {
			out <- 1 + v
		}
		close(out)
	}()
	return out
}

func main() {
	in := make(chan int)
	//初始化输入参数
	go func() {
		for i := 0; i < 10; i++ {
			in <- i
		}
		close(in)
	}()
	//连续调用3次chan,想当与把in中的每个元素都加3
	out := chain(chain(chain(in)))
	for v := range out {
		fmt.Println(v)
	}
}
# 程序输出
3
4
5
6
7
8
9
10
11
12

2、一个线程写数据一个线程读数据

一个线程往管道里写数据、另一个线程从管道里读数据示例。

package main

import (
	"fmt"
)

// writerData
func writerData(intChan chan int) {
	for i := 1; i <= 10; i++ {
		//放入数据
		intChan <- i
		fmt.Printf("writeData写到数据=%v
", i)
	}
	// 关闭
	close(intChan)
}

// readData
func readData(intChan chan int, exitChan chan bool) {
	for {
		v, ok := <-intChan
		if !ok {
			break
		}
		fmt.Printf("readData读到数据=%v
", v)
	}
	// readData读完数据后,即任务完成
	exitChan <- true
	close(exitChan)
}

func main() {
	// 创建两个管道
	intChan := make(chan int, 10)
	// 判断子进程是否结束
	exitChan := make(chan bool, 1)
	go writerData(intChan)
	go readData(intChan, exitChan)
	// 注意主线程退出,两个线程直接退出
	for {
		// 等待读取
		_, ok := <-exitChan
		// 没读到就退出
		if !ok {
			break
		}
	}
}
# 程序输出
writeData写到数据=1
writeData写到数据=2
writeData写到数据=3
writeData写到数据=4
writeData写到数据=5
writeData写到数据=6
writeData写到数据=7
writeData写到数据=8
writeData写到数据=9
writeData写到数据=10
readData读到数据=1
readData读到数据=2
readData读到数据=3
readData读到数据=4
readData读到数据=5
readData读到数据=6
readData读到数据=7
readData读到数据=8
readData读到数据=9
readData读到数据=10

3、多线程读写数据

求每个数字的累加和:

package main

import (
	"fmt"
)

// 写入2000个数据到numChan中
func inputNum(numChan chan int) {
	for i := 1; i <= 2000; i++ {
		numChan <- i
	}
	// 写完数据关闭channel
	close(numChan)
}

// 计算每个数字的累加
// 读取数据并且存入到resChan中,exitChan做协程标记
func getNum(numChan chan int, resChan chan int, exitChan chan bool) {
	for {
		res := 0
		n, ok := <-numChan
		// 值被取完
		if !ok {
			break
		}
		for i := 1; i <= n; i++ {
			res += i
		}
		// 存入到resChan
		resChan <- res
	}
	// 标记退出
	exitChan <- true
}

func main() {
	// 创建三个管道分别为读、写、退出标记
	numChan := make(chan int, 2000)
	resChan := make(chan int, 2000)
	exitChan := make(chan bool, 8)
	// 启动多线程
	go inputNum(numChan)
	for i := 1; i <= 8; i++ {
		go getNum(numChan, resChan, exitChan)
	}
	// 再启动一个线程取出exitChan
	go func() {
		for i := 0; i < 8; i++ {
			// 从exitChan管道取出即可
			<-exitChan
		}
		// 全部取出说明读取numChan数据完毕,关闭resChan
		close(resChan)
	}()
	// 读取resChan数据
	for v := range resChan {
		fmt.Println(v)
	}
}
# 程序输出
1
3
6
10
15
21
28
......

多线程求素数:

package main

import (
	"fmt"
	"math"
)

// 放入数据
func putNum(intChan chan int) {
	for i := 2; i < 8000; i++ {
		intChan <- i
	}
	close(intChan)
}

// 判断素数并放入到primeChan中
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
	for {
		// 从管道取出并判断是不是素数
		num, ok := <-intChan
		flag := true
		if !ok {
			// 取出失败
			break
		}
		for i := 2; i < int(math.Sqrt(float64(num))); i++ {
			// 不是素数,退出
			if num%i == 0 {
				flag = false
				break
			}
		}
		if flag {
			// 是素数,加入到管道中
			primeChan <- num
		}
	}
	// 标记管道退出
	exitChan <- true
}

func main() {
	intChan := make(chan int, 1000)
	// 放入结果
	primeChan := make(chan int, 1000)
	// 标记退出
	exitChan := make(chan bool, 4)
	// 开启协程,放入1-8000
	go putNum(intChan)
	// 开启四个协程,并判断是否为素数
	for i := 0; i < 4; i++ {
		go primeNum(intChan, primeChan, exitChan)
	}
	go func() {
		for i := 0; i < 4; i++ {
			// 只从管道里把内容取出来
			<-exitChan
		}
		close(primeChan)
	}()
	for v := range primeChan {
		fmt.Println("素数有", v)
	}
}
# 程序输出
素数有 2
素数有 3
素数有 4
素数有 5
.....

4、并发范式-生成器

本节通过具体的程序示例来演示Go语言强大的并发处理能力,每个示例代表一个并发处理范式,这些范式具有典

型的特征,在真实的程序中稍加改造就能使用。

在应用系统编程中,常见的应用场景就是调用一个统一的全局的生成器服务,用于生成全局事务号、订单号、序列

号和随机数等。Go对这种场景的支持非常简单,下面以一个随机数生成器为例来说明。

4.1 最简单的带缓冲的生成器

package main

import (
	"fmt"
	"math/rand"
)

func GenerateIntA() chan int {
	ch := make(chan int, 5)
	// 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}

func main() {
	// 启动生成器
	ch := GenerateIntA()
	fmt.Println(<-ch)
	fmt.Println(<-ch)
}
# 程序输出
5577006791947779410
8674665223082153551

4.2 多个goroutine增强型生成器

package main

import (
	"fmt"
	"math/rand"
)

func GenerateIntA() chan int {
	ch := make(chan int, 5)
	// 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}

func GenerateIntB() chan int {
	ch := make(chan int, 5)
	// 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}

func GenerateInt() chan int {
	ch := make(chan int, 10)
	go func() {
		for {
			// 使用select的扇入技术增加生成的随机源
			select {
			case ch <- <-GenerateIntA():
			case ch <- <-GenerateIntB():
			}
		}
	}()
	return ch
}

func main() {
	// 启动生成器
	ch := GenerateInt()
	// 获取生成器资源
	for i := 0; i < 10; i++ {
		fmt.Println(<-ch)
	}
}
# 程序输出
5577006791947779410
2933568871211445515
7981306761429961588
3337066551442961397
2050257992909156333
837825985403119657
8267293389953062911
7660323324116104765
273669266008440571
2282476590775666788

4.3 自动退出

有时希望生成器能够自动退出,可以借助Go通道的退出通知机制(close channel to broadcast)实现。

package main

import (
	"fmt"
	"math/rand"
)

// done接收通知推出信号
func GenerateIntA(done chan struct{}) chan int {
	ch := make(chan int)
	go func() {
	Lable:
		for {
			select {
			case ch <- rand.Int():
			case <-done:
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}

func main() {
	// 创建一个作为接收推出信号的chan
	done := make(chan struct{})
	// 启动生成器
	ch := GenerateIntA(done)
	fmt.Println(<-ch)
	fmt.Println(<-ch)
	// 不在需要生成器,通过close chan发送一个通知给生成器
	close(done)
	// 获取生成器资源
	for v := range ch {
		fmt.Println(v)
	}
}
# 程序结果
5577006791947779410
8674665223082153551

4.4 多重特性的生成器

一个融合了并发、缓冲、退出通知等多重特性的生成器。

package main

import (
	"fmt"
	"math/rand"
)

//done 接收通知推出信号
func GenerateIntA(done chan struct{}) chan int {
	ch := make(chan int, 5)
	go func() {
	Lable:
		for {
			select {
			case ch <- rand.Int():
			case <-done:
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}

//done 接收通知推出信号
func GenerateIntB(done chan struct{}) chan int {
	ch := make(chan int, 10)
	go func() {
	Lable:
		for {
			select {
			case ch <- rand.Int():
			case <-done:
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}

// 通过select做了扇入(Fan In)
func GenerateInt(done chan struct{}) chan int {
	ch := make(chan int)
	send := make(chan struct{})
	go func() {
	Lable:
		for {
			select {
			case ch <- <-GenerateIntA(send):
			case ch <- <-GenerateIntB(send):
			case <-done:
				send <- struct{}{}
				send <- struct{}{}
				break Lable
			}
		}
		close(ch)
	}()
	return ch
}

func main() {
	//创建一个作为接收推出信号的chan
	done := make(chan struct{})
	//启动生成器
	ch := GenerateInt(done)
	//获取生成器资源
	for i := 0; i < 10; i++ {
		fmt.Println(<-ch)
	}
	//通知生产者停止生产
	done <- struct{}{}
	fmt.Println("stop gernarate")
}
# 程序输出
5577006791947779410
2015796113853353331
4893789450120281907
1687184559264975024
9093919513921919021
2202916659517317514
26222426471854123
8999011805617471788
4534277910591376951
6607332037155172840
stop gernarate

5、固定worker工作池

服务器编程中使用最多的就是通过线程池来提升服务的并发处理能力。在Go语言编程中,一样可以轻松地构建固

定数目的 goroutines 作为工作线程池。下面还是以计算多个整数的和为例来说明这种并发范式。

程序中除了主要的 main goroutine ,还开启了如下几类 goroutine:

(1)、初始化任务的 goroutine。

(2)、分发任务的 goroutine。

(3)、等待所有 worker 结束通知,然后关闭结果通道的 goroutine。

main 函数负责拉起上述 goroutine,并从结果通道获取最终的结果。

程序采用三个通道,分别是:

(1)、传递 task 任务的通道。

(2)、传递 task 结果的通道。

(3)、接收 worker 处理完任务后所发送通知的通道。

package main

import (
	"fmt"
)

// 工作池的goroutine数目
const (
	NUMBER = 10
)

// 工作任务
type task struct {
	begin  int
	end    int
	result chan<- int
}

// 初始化待处理task chan
func InitTask(taskChan chan<- task, r chan int, p int) {
	qu := p / 10
	mod := p % 10
	high := qu * 10
	for j := 0; j < qu; j++ {
		b := 10*j + 1
		e := 10 * (j + 1)
		// 1-10
		// 11-20
		// ......
		tsk := task{
			begin:  b,
			end:    e,
			result: r,
		}
		taskChan <- tsk
	}
	if mod != 0 {
		tsk := task{
			begin:  high + 1,
			end:    p,
			result: r,
		}
		taskChan <- tsk
	}
	close(taskChan)
}

//读取task chan分发到worker goroutine处理,workers的总的数量是workers
func DistributeTask(taskChan <-chan task, workers int, done chan struct{}) {
	for i := 0; i < workers; i++ {
		go ProcessTask(taskChan, done)
	}
}

//工作goroutine处理具体工作,并将处理结构发送到结果chan
func ProcessTask(taskChan <-chan task, done chan struct{}) {
	for t := range taskChan {
		t.do()
	}
	done <- struct{}{}
}

// 任务处理:计算begin到end的和
// 执行结果写入到结果chan result中
func (t *task) do() {
	sum := 0
	for i := t.begin; i <= t.end; i++ {
		sum += i
	}
	t.result <- sum
}

// 通过done channel来同步等待所有工作goroutine的结束,然后关闭结果chan
func CloseResult(done chan struct{}, resultChan chan int, workers int) {
	for i := 0; i < workers; i++ {
		<-done
	}
	close(done)
	close(resultChan)
}

// 读取结果通道,汇总结果
func ProcessResult(resultChan chan int) int {
	sum := 0
	for r := range resultChan {
		sum += r
	}
	return sum
}

func main() {
	// 多线程数目
	workers := NUMBER
	// 工作通道
	taskChan := make(chan task, 10)
	// 结果通道
	resultChan := make(chan int, 10)
	// worker信号通道
	done := make(chan struct{}, 10)
	// 初始化task的goroutine,计算1000个自然数之和
	go InitTask(taskChan, resultChan, 1000)
	//分发任务在NUMBER个goroutine池
	DistributeTask(taskChan, workers, done)
	//获取各个goroutine处理完任务的通知,并关闭结果通道
	go CloseResult(done, resultChan, workers)
	//通过结果通道处理结果
	sum := ProcessResult(resultChan)
	fmt.Println("sum=", sum)
}
# 程序输出
sum= 500500

程序的逻辑分析:

(1)、构建 task 并发送到 task 通道中。

(2)、分别启动 n 个工作线程,不停地从 task 通道中获取任务,然后将结果写入结果通道。如果任务通道被关闭,

则负责向收敛结果的 goroutine 发送通知,告诉其当前 worker 已经完成工作。

(3)、收敛结果的 goroutine 接收到所有 task 已经处理完毕的信号后,主动关闭结果通道。

(4)、 main 中的函数 ProcessResult 读取并统计所有的结果。

6、使用 WaitGroup实现固工作池

package main

import (
	"fmt"
	"sync"
)

// 工作任务
type task struct {
	begin  int
	end    int
	result chan<- int
}

// 构建task并写入task通道
func InitTask(taskChan chan<- task, r chan int, p int) {
	qu := p / 10
	mod := p % 10
	high := qu * 10
	for j := 0; j < qu; j++ {
		b := 10*j + 1
		e := 10 * (j + 1)
		tsk := task{
			begin:  b,
			end:    e,
			result: r,
		}
		taskChan <- tsk
	}
	if mod != 0 {
		tsk := task{
			begin:  high + 1,
			end:    p,
			result: r,
		}
		taskChan <- tsk
	}
	close(taskChan)
}

//读取task chan,每个task启动一个worker goroutine进行处理,并等待每个task运行完,关闭结果通道
func DistributeTask(taskChan <-chan task, wait *sync.WaitGroup, result chan int) {
	for v := range taskChan {
		wait.Add(1)
		go ProcessTask(v, wait)
	}
	wait.Wait()
	close(result)
}

// goroutine处理具体工作,并将结果发送到结果通道
func ProcessTask(t task, wait *sync.WaitGroup) {
	t.do()
	wait.Done()
}

//任务执行: 计算begin到end的和,执行结果写入结果chan result
func (t *task) do() {
	sum := 0
	for i := t.begin; i <= t.end; i++ {
		sum += i
	}
	t.result <- sum
}

//读取结果通道,汇总结果
func ProcessResult(resultChan chan int) int {
	sum := 0
	for r := range resultChan {
		sum += r
	}
	return sum
}

func main() {
	// 创建任务通道
	taskChan := make(chan task, 10)
	// 创建结果通道
	resultChan := make(chan int, 10)
	// wait用于同步等待任务的执行
	wait := &sync.WaitGroup{}
	// 初始化task的goroutine,计算100个自然数之和
	go InitTask(taskChan, resultChan, 100)
	// 每个task启动一个goroutine进行处理
	go DistributeTask(taskChan, wait, resultChan)
	//通过结果通道获取结果并汇总
	sum := ProcessResult(resultChan)
	fmt.Println("sum=", sum)
}
# 程序输出
sum= 5050

程序的逻辑分析:

(1)、InitTask 函数构建 task 并发送到 task 通道中。

(2)、分发任务函数 DistributeTask 为每个 task 启动一个 goroutine 处理任务,等待其处理完成,然后关闭结

果通道。

(3)、ProcessResult 函数读取并统计所有的结果。

这几个函数分别在不同的 goroutine 中运行,它们通过通道和 sync.WaitGroup 进行通信和同步。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。