您现在的位置是:首页 >学无止境 >[Go语言实战]并发模式runner网站首页学无止境
[Go语言实战]并发模式runner
第7章 并发模式 runner 笔记
runner
type Runner struct {
// interrupt 通道报告从操作系统发送的信号
interrupt chan os.Signal
// complete 通道报告处理任务已经完成
complete chan error
// <-chan 是一个输出通道, 可以从里面取出数据
// chan<- 是一个输入通道,可以往里面写入数据
// timeout 报告处理任务已经超时
timeout <-chan time.Time
tasks []func(int)
}
// ErrInterrupr 会在接收到操作系统的事件时返回
var ErrInterrupr = errors.New("received interrupt")
// ErrTimeout 会在任务执行超时时返回
var ErrTimeout = errors.New("received timeout")
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d), // 返回一个接受通道 <- chan
}
}
// Add 将一个任务附加到 Runner 上。这个任务是一个
// 接收一个 int 类型的 ID 作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
// Start 执行所有任务,并监视通道事件
func (r *Runner) Start() error {
// 我们希望接收所有中断信号
signal.Notify(r.interrupt, os.Interrupt)
// 用不同的 goroutine 执行不同的任务,就是另起一个goroutine
go func() {
r.complete <- r.run()
}()
select {
// 当任务处理完成时发出的信号
case err := <-r.complete: // 要么在规定的时间内完成了分配的工作,要么收到了操作系统的中断信号
return err
// 当任务处理程序运行超时时发出的信号
case <-r.timeout:
return ErrTimeout
}
}
// run 执行每一个已注册的任务,顺序执行
func (r *Runner) run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupr
}
// tasks 是盛放func(int) 类型的切片[],此时 task就是一个函数,task(id) 就是正在执行的函数
task(id)
}
return nil
}
// gotInterrupt 验证是否接收到了中断信号
func (r *Runner) gotInterrupt() bool {
select {
// 当中断事件被触发时发出的信号
case <-r.interrupt:
// 停止接收后续的任何信号
signal.Stop(r.interrupt)
return true
default:
return false
}
}
// timeout 规定了必须在多少秒内处理完成
const timeout = 3 * time.Second
func main() {
log.Println("starting work..")
// 为本次执行分配超时时间
r := New(timeout)
/**
ps: r := New(timeout) 设置的是r 总共执行的超时时间不得超过3秒, 不是tasks[] 切片中每个任务执行的超时时间
当tasks[0] 时间超过3秒时直接"Terminating due to timeout" 退出执行,所以后面两个createTask() 根本不会执行
*/
// 加入要执行的任务
r.Add(createTask(), createTask(), createTask(), createTask(), createTask())
// 执行任务并处理结果
if err := r.Start(); err != nil {
switch err {
case ErrTimeout:
log.Println("Terminating due to timeout")
os.Exit(1)
case ErrInterrupr:
log.Println("Terminating due to interrupt")
os.Exit(2)
}
}
log.Println("Process ended")
}
// createTask 返回一个根据 id ,休眠指定秒数的示例任务
func createTask() func(id int) {
return func(id int) {
log.Printf("process - Task #%d, Sleep %d", id, time.Duration(id))
time.Sleep(time.Duration(id) * time.Second)
// 测试 r 超时时间总共不得超过3秒
//time.Sleep(4 * time.Second)
}
}
signal.Stop()
func (r *Runner) gotInterrupt() bool {
select {
// 当中断事件被触发时发出的信号
case <-r.interrupt:
// 停止接收后续的任何信号
signal.Stop(r.interrupt)
return true
default:
return false
}
}
在 Go 语言中,signal.Stop() 函数用于停止一个信号的传递。在这里,r.interrupt 是一个 os.Signal 类型的通道变量,它用于接收操作系统发送的 SIGINT 信号(即用户按下 Ctrl+C 组合键时会触发该信号)。当程序结束时,我们一般需要停止对该信号的监听,以避免出现不必要的错误。
因此,signal.Stop(r.interrupt) 的作用是停止对 r.interrupt 通道的监听,即不再接收 SIGINT 信号。这通常在程序结束前进行清理工作时使用,以确保不会在程序退出后继续接收信号,导致程序异常终止或发生其他错误。
signal.Notify(r.interrupt, os.Interrupt)
在 Go 语言中,signal.Notify() 函数用于向一个通道发送操作系统信号。在这里,我们使用该函数将 os.Interrupt 信号(即用户按下 Ctrl+C 组合键时会触发该信号)发送到了 r.interrupt 通道。
具体来说,Notify() 函数接受两个参数:第一个参数是一个通道,用于接收指定的信号;第二个参数是一个可变参数,代表要监听的信号列表。当程序接收到任意一个被监听的信号时,就会向通道中发送该信号,从而触发相应的处理逻辑。
在这个例子中,我们使用了 Notify() 函数将 os.Interrupt 信号发送到了 r.interrupt 通道,以便在程序运行过程中能够捕获该信号并进行相应的处理。这样做可以使程序更加健壮和灵活,能够在接收到中断信号时优雅地退出或进行清理工作。
单向通道
在 Go 语言中,<-chan 和 chan<- 是用于声明单向通道的关键字,它们用于限制通道的读写方向。以下是它们的区别:
● <-chan 表示只能从通道中读取数据,不能向通道中写入数据。这种类型的通道称为接收通道。
● chan<- 表示只能向通道中写入数据,不能从通道中读取数据。这种类型的通道称为发送通道。
举个例子,在以下代码中,
func f(ch <-chan int) {
// 从 ch 中读取数据
}
func g(ch chan<- int) {
// 向 ch 中写入数据
}
函数 f 接受一个接收通道作为参数,函数 g 接受一个发送通道作为参数。这样就可以限制通道的读写方向,以确保代码的正确性和安全性。
需要注意的是,chan 关键字本身表示通道类型,既可以用于声明普通的双向通道,也可以用于声明单向通道。但是,一旦将通道定义为单向通道类型,就不能再转换为双向通道类型。
go os.Exit(1) os.Exit(2)
os.Exit(1) 和 os.Exit(2) 都是用于退出当前程序的函数。
当调用 os.Exit(1) 时,程序会以非零状态码退出。根据惯例,非零状态码表示程序异常终止。这个状态码可以在命令行中通过 $ echo $? 命令获取到。
同样地,当调用 os.Exit(2) 时,程序也会以非零状态码退出。但是,这个状态码通常被用于指示特定类型的错误,比如命令行参数错误或者文件读写错误等。
需要注意的是,调用 os.Exit() 函数会立即终止程序的执行,因此应该谨慎使用,并且在退出之前应该确保已经完成了所有必要的清理工作,比如关闭文件句柄、释放资源等。