您现在的位置是:首页 >技术教程 >Golang程序报错:fatal error: all goroutines are asleep - deadlock网站首页技术教程

Golang程序报错:fatal error: all goroutines are asleep - deadlock

HumbleSwage 2023-05-14 20:00:02
简介Golang程序报错:fatal error: all goroutines are asleep - deadlock


完整的报错信息如下:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.(*WorkerManager).KeepAlive(0xc000088f60)
        /root/go_workspace/studygoup/05.go:66 +0x59
main.(*WorkerManager).StartWorkPool(0xc000088f60)
        /root/go_workspace/studygoup/05.go:61 +0xef
main.main()
        /root/go_workspace/studygoup/05.go:88 +0x47

goroutine 18 [chan receive]:
main.(*Worker).Work(0x0?, 0x0?, 0x0?)
        /root/go_workspace/studygoup/05.go:34 +0xfb
created by main.(*WorkerManager).StartWorkPool
        /root/go_workspace/studygoup/05.go:58 +0x27

当然这里的文件路径跟你不一样。主要就是fatal error: all goroutines are asleep - deadlock!以及其中的chan receive.

1.原始代码

我是在构建发送任务与任务处理搭配协程池的时候所遇到的。代码架构如下:请添加图片描述
一开始就会生成这样一个workerPool,然后在执行任务的时候,goroutine挂掉了,就会被放进Wokers中,经过检查会被重新放回进WorkerPool中。
完整的代码逻辑如下:

package main

import (
	"fmt"
	"math"
	"sync"
)

// 创建一个无缓冲的channel
var workChan = make(chan int, 0)

type Worker struct {
	id  int
	err error
}

func (w *Worker) Work(c chan int, workers chan *Worker) {
	defer func() {
		wg.Done()
		// 捕获错误
		if r := recover(); r != nil {
			// 将错误断言
			if err, ok := r.(error); ok {
				w.err = err
			} else {
				w.err = fmt.Errorf("panic happened with [%v]", r)
			}
			// 通知主goroutine,将死亡的chan加入
			workers <- w
		}
	}()
	// 模拟业务,这里应该也会阻塞
	for each := range c {
		fmt.Println("Task:", each)
	}
}

type WorkerManager struct {
	Workers  chan *Worker
	nWorkers int
}

func NewWorkerManager(num int) *WorkerManager {
	return &WorkerManager{
		Workers:  make(chan *Worker, num),
		nWorkers: num,
	}
}

func (wm *WorkerManager) StartWorkPool() {
	// 开启指定数量的线程池
	for i := 0; i < wm.nWorkers; i++ {
		// 先创建一个worker对象
		wk := &Worker{id: i}
		go wk.Work(workChan, wm.Workers)
	}
	// 协程保活
	wm.KeepAlive()

}

func (wm *WorkerManager) KeepAlive() {
	// 这里会阻塞
	for wk := range wm.Workers {
		// log the error
		fmt.Printf("Worker %d stopped with err: [%v] 
", wk.id, wk.err)
		// reset err
		wk.err = nil
		// 当前这个wk已经死亡了,需要重新启动他的业务
		go wk.Work(workChan, wm.Workers)
	}
}

// 保证所有资源的顺利执行
var wg = sync.WaitGroup{}

func SendTask(c chan int, task int) {
	wg.Add(1)
	c <- task
}

func main() {
	// 创建线程管理者
	wm := NewWorkerManager(10)
	// 开启指定数量的线程池
	wm.StartWorkPool()
	// 总的任务数
	TaskCnt := math.MaxInt64
	// 依次发送任务
	for i := 0; i < TaskCnt; i++ {
		SendTask(workChan, i)
	}
	// 等待完成
	wg.Wait()
}

一旦按照上面的代码执行,就会立即报错!报错信息如最上面的详细信息所示。

2.错误原因分析

  • main函数中执行wm.StartWorkPool(),注意在这里是串行执行,也就是说要等wm.StartWorkPool()执行完毕才执行下面一句TaskCnt := math.MaxInt64
  • 然后我们深入wm.StartWorkPool()会发现,其中没有明显阻塞的地方,而是调用了一个wm.KeepAlive(),其中就是一直监视NewWorkerManager.Workers中是否有被添加worker,即wm.KeepAlive()中语句for wk := range wm.Workers {...,问题就是出现在这里,程序一直在这里阻塞,无法回到main中继续执行,要在后续的Work中才有可能发生向NewWorkerManager.Workers添加操作。因此直接造成了死锁

3. 解决方案

因为是在wm.StartWorkPool()——> wm.KeepAlive()——>for wk := range wm.Workers {...,最终造成了阻塞,因此我们在这三个中任一个位置开启协程来处理。这里我直接在main函数中使用go wm.StartWorkPool()即可最终解决问题。

func main() {
	// 创建线程管理者
	wm := NewWorkerManager(10)
	// 开启指定数量的线程池
	go wm.StartWorkPool()
	// 总的任务数
	TaskCnt := math.MaxInt64
	// 依次发送任务
	for i := 0; i < TaskCnt; i++ {
		SendTask(workChan, i)
	}
	// 等待完成
	wg.Wait()
}

4. 经验总结

  • 先分析可能发生阻塞的地方;【尤其是管道读取的地方】
  • 从主函数入手,依次分析并理清阻塞处的逻辑执行顺序;
  • 针对一块阻塞处,判断其写操作会不会在其后面,程序永远到不了;
  • 理清调用链逻辑,确定协程开启的地方。【开启协程的地方不会阻塞,立即往下执行】

如果实在还无法执行,将你的代码post到评论区,让大家一起帮你解决!

5. 练习

查看下面哪个地方会发生deadlock。该程序使用协程完成1~30的各个数的阶层,并把各个结果放进map中。最后显示出来

package main

import (
	"fmt"
	"sync"
)



var wg = sync.WaitGroup{}
var mapLock = sync.Mutex{}

func Calculate(nums chan int, resMap map[int]uint64) {

	// 进来会直接阻塞等待
	for num := range nums {
		var res uint64 = 1
		for i := 1; i <= num; i++ {
			res = res * uint64(i)
		}
		// 加锁保证并发的安全
		mapLock.Lock()
		// map并发不安全
		resMap[num] = res
		mapLock.Unlock()

	}
	wg.Done()

}

func SendTask(c chan int, task int) {
	wg.Add(1)
	c <- task
}

// 将发送任务与处理任务分开
func main() {
	// 创建一个无缓冲的管道
	numsChan := make(chan int)
	// 创建一个保存结果的map
	resMap := make(map[int]uint64)
	// 开启指定数量的协程
	for i := 0; i < 3; i++ {
		go Calculate(numsChan, resMap)
	}
	// 循环把各个数据放进管道中
	for i := 1; i <= 30; i++ {
		SendTask(numsChan, i)
	}

	wg.Wait()
	fmt.Println(resMap)
}

答案放在评论区

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。