Go 的并发:Goroutine 与 Channel 介绍

本文同步发表于 Limitless Ping

Goroutine 像是 Go 语言的 thread, 使 Go 建立多工处理, 搭配 Channel 使 Goroutine 操作简单化, 本文介绍 Goroutine 及 Channel 的使用方式。

範例程式码可以在 golang-concurrency-example 中找到,每个程式第一行可以找到其範例档名。

单执行绪

在单执行绪下,每行程式码都会依照顺序执行。

// single-thread.gofunc say(s string) {    for i := 0; i < 5; i++ {        time.Sleep(100 * time.Millisecond)        fmt.Println(s)    }}func main() {    say("world")    say("hello")}
worldworldworldworldworldhellohellohellohellohello

上例会先执行完 say("world") 后再执行 say("hello")

single-thread

但有时个别方法的处理是没有先后顺序的,这时善用多执行绪就可以大大的提升效率。

多执行绪

在多执行绪下,最多可以同时执行与 CPU 数相等的 Goroutine。

// multi-thread.gofunc main() {    go say("world")    say("hello")}
worldhellohelloworldworldhellohelloworldworldhello

如此一来, say("world")会跑在另一个执行绪(Goroutine)上,使其并行执行。

multi-thread

CPU 数可以使用 runtime.NumCPU() 取得。

Goroutine 介绍

可以想成建立了一个 Goroutine 就是建立了一个新的 Thread。

go f(x, y, z)
go 开头的函式叫用可以使 f 跑在另一个 Goroutine 上f, x, y, z 取自目前的 goroutinemain 函式也是跑在 Goroutine 上Main Goroutine 执行结束后, 其他的 Goroutine 会跟着强制关闭

等待

多执行绪下,经常需要处理的是执行绪之间的状态管理,其中一个经常发生的事情是等待,例如A执行绪需要等B执行绪计算并取得资料后才能继续往下执行,在这情况下等待就变得十分重要。

应该等待的时机

func main() {    go say("world")    go say("hello")}

这个状态下会有三个 Goroutine:

mainsay("world")say("hello")

这里的问题发生在 main Goroutine 结束时,另外两个 say Goroutine 会被强制关闭导致结果错误,这时就需要等待其他的 Goroutine 结束后 main Goroutine 才能结束。

接下来会介绍三种等待的方式,并且分析其利弊:

time.Sleep: 休眠指定时间sync.WaitGroup: 等待直到指定数量的 Done() 叫用Channel 阻塞: 使用 Channel 阻塞机制,使用接收时等待的特性避免执行绪继续执行

time.Sleep

使 Goroutine 休眠,让其他的 Goroutine 在 main 结束前有时间执行完成。

// sleep.gofunc main() {    go say("world")    go say("hello")    time.Sleep(5 * time.Second)}

sleep

缺点:

休息指定时间可能会比 Goroutine 需要执行的时间长或短,太长会耗费多余的时间,太短会使其他 Goroutine 无法完成

sync.WaitGroup

// wait-group.gofunc say(s string, wg *sync.WaitGroup) {    defer wg.Done()    for i := 0; i < 5; i++ {        time.Sleep(100 * time.Millisecond)        fmt.Println(s)    }}func main() {    wg := new(sync.WaitGroup)    wg.Add(2)    go say("world", wg)    go say("hello", wg)    wg.Wait()}

wait-group

产生与想要等待的 Goroutine 同样多的 WaitGroup Counter将 WaitGroup 传入 Goroutine 中,在执行完成后叫用 wg.Done() 将 Counter 减一wg.Wait() 会等待直到 Counter 减为零为止

优点

避免时间预估的错误

缺点

需要手动配置对应的 Counter

Channel

最后介绍的是使用 Channel 等待, 原为 Goroutine 沟通时使用的,但因其阻塞的特性,使其可以当作等待 Goroutine 的方法。

// channel-wait.gofunc say(s string, c chan string) {    for i := 0; i < 5; i++ {        time.Sleep(100 * time.Millisecond)        fmt.Println(s)    }    c <- "FINISH"}func main() {    ch := make(chan string)    go say("world", ch)    go say("hello", ch)    <-ch    <-ch}

channel-wait

起了两个 Goroutine(say("world", ch), say("hello", ch)) ,因此需要等待两个 FINISH 推入 Channel 中才能结束 Main Goroutine。

优点

避免时间预估的错误语法简洁

Channel 阻塞的方法为 Go 语言中等待的主要方式。

多执行绪下的共享变数

在执行绪间使用同样的变数时,最重要的是确保变数在当前的正确性,在没有控制的情况下极有可能发生问题,下面有个例子:

// total-error.gofunc main() {    total := 0    for i := 0; i < 1000; i++ {        go func() {            total++        }()    }    time.Sleep(time.Second)    fmt.Println(total)}
958

total-error

假设目前加到28,在多执行绪的情况下:

goroutine1 取值 28 做运算goroutine2 有可能在 goroutine1total++ 前就取 total 的值,因此有可能取到 28这样的情况下做两次加法的结果会是 29 而非 30

在多个 goroutine 里对同一个变数total做加法运算,在赋值时无法确保其为安全的而导致运算错误,此问题称为 Race Condition。

互斥锁(sync.Mutex)

在这种状况下,可以使用互斥锁(sync.Mutex)来保证变数的安全:

// total-mutex.gotype SafeNumber struct {    v   int    mux sync.Mutex // 互斥锁}func main() {    total := SafeNumber{v: 0}    for i := 0; i < 1000; i++ {        go func() {            total.mux.Lock()            total.v++            total.mux.Unlock()        }()    }    time.Sleep(time.Second)    total.mux.Lock()    fmt.Println(total.v)    total.mux.Unlock()}
1000

total-mutex

互斥锁使用在资料结构(struct)中,用以确保结构中变数读写时的安全,它提供两个方法:

LockUnlock

LockUnlock 中间,会使其他的 Goroutine 等待,确保此区块中的变数安全。

藉由 Channel 保证变数的安全性

// total-channel.gofunc main() {    total := 0    ch := make(chan int, 1)    ch <- total    for i := 0; i < 1000; i++ {        go func() {            ch <- <-ch + 1        }()    }    time.Sleep(time.Second)    fmt.Println(<-ch)}
1000

total-channel

goroutine1 拉出 total 后,Channel 中没有资料了因为 Channel 中没有资料,因此造成 goroutine2 等待goroutine1 计算完成后,将 total 推入 Channelgoroutine2 等到 Channel 中有资料,拉出后结束等待,继续做运算

因为 Channel 推入及拉出时等待的特性,被拉出来做计算的值会保证是安全的。

因为此範例一定要拉出 Channel 资料才能做运算,所以使用非立即阻塞的 Buffered Channel ,与 Unbuffered Channel 的差别等下会说明。

上述的三个例子在 main goroutine 中都使用 time.Sleep 避免程式提前结束。

Channel 介绍

上面藉由两个在多执行绪中重要的议题:等待及变数的共享,带出 Channel 强大的处理能力,接着来深入了解一下 Channel。

Channel 可以想成一条管线,这条管线可以推入数值,并且也可以将数值拉取出来。

因为 Channel 会等待至另一端完成推入/拉出的动作后才会继续往下处理,这样的特性使其可以在 Goroutines 间同步的处理资料,而不用使用明确的 lock, unlock 等方法。

建立 Channel

ch := make(chan int) // 建立 int 型别的 Channel

推入/拉出 Channel 内的值,使用 <- 箭头运算子:

Channel 在 <- 左边:将箭头右边的数值推入 Channel 中
ch <- v    // Send v to channel ch.v := <-ch  // Receive from ch, and assign value to v.

Channel 的阻塞

Goroutine 使用 Channel 时有两种情况会造成阻塞:

将资料推入 Channel,但其他 Goroutine 还未拉取资料时,将资料推入的 Goroutine 会被迫等待其他 Goroutine 拉取资料才能往下执行

channel-sleep-push

当 Channel 中没有资料,但要从中拉取时,想要拉取资料的 Goroutine 会被迫等待其他 Goroutine 推入资料并自己完成拉取后才能往下执行

channel-sleep-pull

Goroutine 推资料入 Channel 时的等待情境

// channel-block-push.gofunc main() {    ch := make(chan string)    go func() { // calculate goroutine        fmt.Println("calculate goroutine starts calculating")        time.Sleep(time.Second) // Heavy calculation        fmt.Println("calculate goroutine ends calculating")        ch <- "FINISH" // goroutine 执行会在此被迫等待        fmt.Println("calculate goroutine finished")    }()    time.Sleep(2 * time.Second) // 使 main 比 goroutine 慢    fmt.Println(<-ch)    time.Sleep(time.Second)    fmt.Println("main goroutine finished")}
calculate goroutine starts calculatingcalculate goroutine ends calculatingFINISHcalculate goroutine finishedmain goroutine finished

此例使用 time.Sleep 强迫 main 执行慢于 calculate,现在来观察输出的结果:

calculate 会先执行并且计算完成calculate 将 FINISH 讯号推入 Channel但由于目前 main 还未拉取 Channel 中的资料,所以 calculate 会被迫等待,因此 calculate 的最后一行 fmt.Println("main goroutine finished") 没有马上输出在画面上main 拉取了 Channel 中的资料calculate 执行fmt.Println("main goroutine finished") 并结束main 执行完成

Goroutine 拉资料出 Channel 时的等待情境

// channel-block-pull.gofunc main() {    ch := make(chan string)    go func() {        fmt.Println("calculate goroutine starts calculating")        time.Sleep(time.Second) // Heavy calculation        fmt.Println("calculate goroutine ends calculating")        ch <- "FINISH"        fmt.Println("calculate goroutine finished")    }()    fmt.Println("main goroutine is waiting for channel to receive value")    fmt.Println(<-ch) // goroutine 执行会在此被迫等待    fmt.Println("main goroutine finished")}
main goroutine is waiting for channel to receive valuecalculate goroutine starts calculatingcalculate goroutine ends calculatingcalculate goroutine finishedFINISHmain goroutine finished
main 因拉取的时候 calculate 还没将资料推入 Channel 中,因此 main 会被迫等待,因此 main 的最后一行 fmt.println 没有马上输出在画面上calculate 执行并且计算完成calculate 将 FINISH 推入 Channelcalculate 执行完成main 拉取了 Channel 中的资料并且执行完成

Unbuffered Channel

前面一直提到的是 Unbuffered Channel,此种 Channel 只要

推入一个资料会造成推入方的等待拉出时没有资料会造成拉出方的等待

使用 Unbuffered Channel 的坏处是:如果推入方的执行一次的时间较拉取方短,会造成推入方被迫等待拉取方才能在做下一次的处理,这样的等待是不必要并且需要被避免的。

为了解决推入方等待问题,可以使用另一种 Channel:Buffered Channel。

Buffered Channel

ch: make(chan int, 100)

Buffered Channel 的宣告会在第二个参数中定义 buffer 的长度,它只会在 Buffered 中资料填满以后才会阻塞造成等待,以上例来说:第101个资料推入的时候,推入方的 Goroutine 才会等待。

buffered-channel

下面的例子分别使用 Buffered Channel 跟 Unbuffered Channel 的差别:

// unbuffered-channel-error.gofunc main() {    ch := make(chan int)    ch <- 1 // 等到天荒地老    fmt.Println(<-ch)}
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan send]:main.main()        /go/unbuffered-channel-error.go:9 +0x59exit status 2

上例使用 Unbuffered Channel:

只有一条 Goroutine:main推入 1 后因为还没有其他 Goroutine 拉取 Channel 中的资料,所以进入阻塞状态因为 main 已经在推入资料时阻塞,所以拉取的程式永远不会被执行,造成死结

unbuffered-channel-error

在相同的情况下,Buffered Channel 并不会被阻塞:

// buffered-channel.gofunc main() {    ch := make(chan int, 1)    ch <- 1    fmt.Println(<-ch)}
1

原因是:

推入 1 后 Channel 内的资料数为1并没有超过 Buffer 的长度1,所以不会被阻塞因为没有阻塞,所以下一行拉取的程式码可以被执行,并且完成执行

buffered-channel-work

Loop 中的 Channel

在迴圈中的 Channel 可以藉由第二个回传值 ok 确认 Channel 是否被关闭,如果被关闭的话代表此 Channel 已经不再使用,可以结束巡览。

// for-loop.gofunc main() {    c := make(chan int)    go func() {        for i := 0; i < 10; i++ {            c <- i        }        close(c) // 关闭 Channel    }()    for {        v, ok := <-c        if !ok { // 判断 Channel 是否关闭            break        }        fmt.Println(v)    }}
0123456789

如果对 Closed Channel 推入资料的话会造成 Panic:

// closed-channel-panic.gofunc main() {    c := make(chan int)    close(c)    c <- 0 // Panic!!!}
panic: send on closed channel

为了避免将资料推入已关闭的 Channel 中造成 Panic,Channel 的关闭应该由推入的 Goroutine 处理。

range 中的 Channel

range 是可以巡览 Channel 的,终止条件为 Channel 的状态为已关闭的(Closed):

// range.gofunc main() {    c := make(chan int, 10)    go func() {        for i := 0; i < 10; i++ {            c <- i        }        close(c) // 关闭 Channel    }()    for i := range c { // 在 close 后跳出迴圈        fmt.Println(i)    }}

使用 select 避免等待

在 Channel 推入/拉取时,会有一段等待的时间而造成 Goroutine 无法回应,如果此 Goroutine 是负责处理画面的,使用者就会看到画面 lag 的情况,这是我们不想见的情况。

例如之前提到的例子:

// block.gofunc main() {    ch := make(chan string)    go func() {        fmt.Println("calculate goroutine starts calculating")        time.Sleep(time.Second) // Heavy calculation        fmt.Println("calculate goroutine ends calculating")        ch <- "FINISH"        fmt.Println("calculate goroutine finished")    }()    fmt.Println("main goroutine is waiting for channel to receive value")    fmt.Println(<-ch) // goroutine 执行会在此被迫等待    fmt.Println("main goroutine finished")}
main goroutine is waiting for channel to receive value # main goroutine 阻塞calculate goroutine starts calculatingcalculate goroutine ends calculatingcalculate goroutine finishedFINISH # main goroutine 解除阻塞main goroutine finished

main goroutine 要拉取 ch 的资料时,会被迫等待,这时会无法回馈目前的状态给使用者,造成卡顿的清况。

这时可以使用 Go 提供的 select 语法,让开发者可以很轻鬆的处理 Channel 的多种情况,包括阻塞时的处理。

// select.gofunc main() {    ch := make(chan string)    go func() {        fmt.Println("calculate goroutine starts calculating")        time.Sleep(time.Second) // Heavy calculation        fmt.Println("calculate goroutine ends calculating")        ch <- "FINISH"        time.Sleep(time.Second)        fmt.Println("calculate goroutine finished")    }()    for {        select {        case <-ch: // Channel 中有资料执行此区域            fmt.Println("main goroutine finished")            return        default: // Channel 阻塞的话执行此区域            fmt.Println("WAITING...")            time.Sleep(500 * time.Millisecond)        }    }}
WAITING... # main goroutine 在阻塞时可以回应calculate goroutine starts calculatingWAITING... # main goroutine 在阻塞时可以回应WAITING... # main goroutine 在阻塞时可以回应calculate goroutine ends calculatingmain goroutine finished # main goroutine 解除阻塞并结束程式

将刚刚的例子改为 select 来处理,可以使 Channel 的推入/拉取不会阻塞:

会在没有阻塞的情况下才会执行对应的区块case <-ch:: 会等到没有阻塞情况时(ch 内有资料)才会执行default:: 在所有的 case 都阻塞的情况下执行

因为有 default 可以设置,当 Channel 阻塞时也可以藉由 default 输出资讯让使用者知道。

总结

一开始提到了单执行绪跟多执行绪的差别,接着带出 Goroutine ,并介绍各种等待方式(time.Sleep, sync.WaitGroup 及 Channel)和执行绪间分享变数的问题(Race Condition)及解决方法(sync.Mutex 及 Channel),从而带出 Channel 在执行绪中方便强大的能力。

再来讲述 Channel 的使用方式,及其阻塞的时机(推入阻塞及拉取阻塞),接着说明 Unbuffered 及 Buffered Channel 的差别,并且说明可以藉由 Unbuffered Channel 降低效能上的损失。

Channel 传回的第二个参数:ok,可以判断此 Channel 是否已经关闭,并被 range 用在结束巡览的判断中。

最后说明了 select 可以 Channel 在阻塞时让 Goroutine 保持非阻塞的状态避免卡顿。

藉由 Goroutine 及 Channel 简单的语法但是强大的能力使工程师开发多工程式的时候可以写出优雅又易于维护的代码,是 Go 语言的优势之一。

参考资料

A tour of Go - GoroutinesTrevor Forrey - Learning Go’s Concurrency Through Illustrations

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章