8.4. Channels
如果説goroutine是Go語音程序的併發體的話,那麽channels它們之間的通信機製。一個channels是一個通信機製,它可以讓一個goroutine通過它給另一個goroutine發送值信息。每個channel都有一個特殊的類型,也就是channels可發送數據的類型。一個可以發送int類型數據的channel一般寫爲chan int。
使用內置的make函數,我們可以創建一個channel:
ch := make(chan int) // ch has type 'chan int'
和map類似,channel也一個對應make創建的底層數據結構的引用。當我嗎複製一個channel或用於函數參數傳遞時,我嗎隻是拷貝了一個channel引用,因此調用者何被調用者將引用同一個channel對象。和其它的引用類型一樣,channel的零值也是nil。
兩個相同類型的channel可以使用==運算符比較。如果兩個channel引用的是相通的對象,那麽比較的結果爲眞。一個channel也可以和nil進行比較。
一個channel有發送和接受兩個主要操作,都是通信行爲。一個發送語句將一個值從一個goroutine通過channel發送到另一個執行接收操作的goroutine。發送和接收兩個操作都是用<-
運算符。在發送語句中,<-
運算符分割channel和要發送的值。在接收語句中,<-
運算符寫在channel對象之前。一個不使用接收結果的接收操作也是合法的。
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
Channel還支持close操作,用於關閉channel,隨後對基於該channel的任何發送操作都將導致panic異常。對一個已經被close過的channel之行接收操作依然可以接受到之前已經成功發送的數據;如果channel中已經沒有數據的話講産生一個零值的數據。
使用內置的close函數就可以關閉一個channel:
close(ch)
以最簡單方式調用make函數創建的時一個無緩存的channel,但是我們也可以指定第二個整形參數,對應channel的容量。如果channel的容量大於零,那麽該channel就是帶緩存的channel。
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
我們將先討論無緩存的channel,然後在8.4.4節討論帶緩存的channel。
8.4.1. 不帶緩存的Channels
一個基於無緩存Channels的發送操作將導致發送者goroutine阻塞,直到另一個goroutine在相同的Channels上執行接收操作,當發送的值通過Channels成功傳輸之後,兩個goroutine可以繼續執行後面的語句。反之,如果接收操作先發生,那麽接收者goroutine也將阻塞,直到有另一個goroutine在相同的Channels上執行發送操作。
基於無緩存Channels的發送和接收操作將導致兩個goroutine做一次同步操作。因爲這個原因,無緩存Channels有時候也被稱爲同步Channels。當通過一個無緩存Channels發送數據時,接收者收到數據發生在喚醒發送者goroutine之前(譯註:happens before,這是Go語言併發內存模型的一個關鍵術語!)。
在討論併發編程時,當我們説x事件在y事件之前發生(happens before),我們併不是説x事件在時間上比y時間更早;我們要表達的意思是要保證在此之前的事件都已經完成了,例如在此之前的更新某些變量的操作已經完成,你可以放心依賴這些已完成的事件了。
當我們説x事件旣不是在y事件之前發生也不是在y事件之後發生,我們就説x事件和y事件是併發的。這併不是意味着x事件和y事件就一定是同時發生的,我們隻是不能確定這兩個事件發生的先後順序。在下一章中我們將看到,當兩個goroutine併發訪問了相同的變量時,我們有必要保證某些事件的執行順序,以避免出現某些併發問題。
在8.3節的客戶端程序,它在主goroutine中(譯註:就是執行main函數的goroutine)將標準輸入複製到server,因此當客戶端程序關閉標準輸入時,後台goroutine可能依然在工作。我們需要讓主goroutine等待後台goroutine完成工作後再退出,我們使用了一個channel來同步兩個goroutine:
gopl.io/ch8/netcat3
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
當用戶關閉了標準輸入,主goroutine中的mustCopy函數調用將返迴,然後調用conn.Close()關閉讀和寫方向的網絡連接。關閉網絡鏈接中的寫方向的鏈接將導致server程序收到一個文件(end-of-file)結束的信號。關閉網絡鏈接中讀方向的鏈接將導致後台goroutine的io.Copy函數調用返迴一個“read from closed connection”(“從關閉的鏈接讀”)類似的錯誤,因此我們臨時移除了錯誤日誌語句;在練習8.3將會提供一個更好的解決方案。(需要註意的是go語句調用了一個函數字面量,這Go語言中啟動goroutine常用的形式。)
在後台goroutine返迴之前,它先打印一個日誌信息,然後向done對應的channel發送一個值。主goroutine在退出前先等待從done對應的channel接收一個值。因此,總是可以在程序退出前正確輸出“done”消息。
基於channels發送消息有兩個重要方面。首先每個消息都有一個值,但是有時候通訊的事實和發生的時刻也同樣重要。當我們更希望強調通訊發生的時刻時,我們將它稱爲消息事件。有些消息事件併不攜帶額外的信息,它僅僅是用作兩個goroutine之間的同步,這時候我們可以用struct{}
空結構體作爲channels元素的類型,雖然也可以使用bool或int類型實現同樣的功能,done <- 1
語句也比done <- struct{}{}
更短。
練習 8.3: 在netcat3例子中,conn雖然是一個interface類型的值,但是其底層眞實類型是*net.TCPConn
,代表一個TCP鏈接。一個TCP鏈接有讀和寫兩個部分,可以使用CloseRead和CloseWrite方法分别關閉它們。脩改netcat3的主goroutine代碼,隻關閉網絡鏈接中寫的部分,這樣的話後台goroutine可以在標準輸入被關閉後繼續打印從reverb1服務器傳迴的數據。(要在reverb2服務器也完成同樣的功能是比較睏難的;參考練習 8.4。)
8.4.2. 串聯的Channels(Pipeline)
Channels也可以用於將多個goroutine鏈接在一起,一個Channels的輸出作爲下一個Channels的輸入。這種串聯的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯起來,如圖8.1所示。
第一個goroutine是一個計數器,用於生成0、1、2、……形式的整數序列,然後通過channel將該整數序列發送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每個整數求平方,然後將平方後的結果通過第二個channel發送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每個整數。爲了保持例子清晰,我們有意選擇了非常簡單的函數,當然三個goroutine的計算很簡單,在現實中確實沒有必要爲如此簡單的運算構建三個goroutine。
gopl.io/ch8/pipeline1
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}
如您所料,上面的程序將生成0、1、4、9、……形式的無窮數列。像這樣的串聯Channels的管道(Pipelines)可以用在需要長時間運行的服務中,每個長時間運行的goroutine可能會包含一個死循環,在不同goroutine的死循環內部使用串聯的Channels來通信。但是,如果我們希望通過Channels隻發送有限的數列該如何處理呢?
如果發送者知道,沒有更多的值需要發送到channel的話,那麽讓接收者也能及時知道沒有多餘的值可接收將是有用的,因爲接收者可以停止不必要的接收等待。這可以通過內置的close函數來關閉channel實現:
close(naturals)
當一個channel被關閉後,再向該channel發送數據將導致panic異常。當一個被關閉的channel中已經發送的數據都被成功接收後,後續的接收操作將不再阻塞,它們會立卽返迴一個零值。關閉上面例子中的naturals變量對應的channel併不能終止循環,它依然會收到一個永無休止的零值序列,然後將它們發送給打印者goroutine。
沒有辦法直接測試一個channel是否被關閉,但是接收操作有一個變體形式:它多接收一個結果,多接收的第二個結果是一個布爾值ok,ture表示成功從channels接收到值,false表示channels已經被關閉併且里面沒有值可接收。使用這個特性,我們可以脩改squarer函數中的循環代碼,當naturals對應的channel被關閉併沒有值可接收時跳出循環,併且也關閉squares對應的channel.
// Squarer
go func() {
for {
x, ok := <-naturals
if !ok {
break // channel was closed and drained
}
squares <- x * x
}
close(squares)
}()
因爲上面的語法是笨拙的,而且這種處理模式很場景,因此Go語言的range循環可直接在channels上面迭代。使用range循環是上面處理模式的簡潔語法,它依次從channel接收數據,當channel被關閉併且沒有值可接收時跳出循環。
在下面的改進中,我們的計數器goroutine隻生成100個含數字的序列,然後關閉naturals對應的channel,這將導致計算平方數的squarer對應的goroutine可以正常終止循環併關閉squares對應的channel。(在一個更複雜的程序中,可以通過defer語句關閉對應的channel。)最後,主goroutine也可以正常終止循環併退出程序。
gopl.io/ch8/pipeline2
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}
其實你併不需要關閉每一個channel。隻要當需要告訴接收者goroutine,所有的數據已經全部發送時才需要關閉channel。不管一個channel是否被關閉,當它沒有被引用時將會被Go語言的垃圾自動迴收器迴收。(不要將關閉一個打開文件的操作和關閉一個channel操作混淆。對於每個打開的文件,都需要在不使用的使用調用對應的Close方法來關閉文件。)
視圖重複關閉一個channel將導致panic異常,視圖關閉一個nil值的channel也將導致panic異常。關閉一個channels還會觸發一個廣播機製,我們將在8.9節討論。
8.4.3. 單方向的Channel
隨着程序的增長,人們習慣於將大的函數拆分爲小的函數。我們前面的例子中使用了三個goroutine,然後用兩個channels連鏈接它們,它們都是main函數的局部變量。將三個goroutine拆分爲以下三個函數是自然的想法:
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
其中squarer計算平方的函數在兩個串聯Channels的中間,因此擁有兩個channels類型的參數,一個用於輸入一個用於輸出。每個channels都用有相同的類型,但是它們的使用方式想反:一個隻用於接收,另一個隻用於發送。參數的名字in和out已經明確表示了這個意圖,但是併無法保證squarer函數向一個in參數對應的channels發送數據或者從一個out參數對應的channels接收數據。
這種場景是典型的。當一個channel作爲一個函數參數是,它一般總是被專門用於隻發送或者隻接收。
爲了表明這種意圖併防止被濫用,Go語言的類型繫統提供了單方向的channel類型,分别用於隻發送或隻接收的channel。類型chan<- int
表示一個隻發送int的channel,隻能發送不能接收。相反,類型<-chan int
表示一個隻接收int的channel,隻能接收不能發送。(箭頭<-
和關鍵字chan的相對位置表明了channel的方向。)這種限製將在編譯期檢測。
因爲關閉操作隻用於斷言不再向channel發送新的數據,所以隻有在發送者所在的goroutine才會調用close函數,因此對一個隻接收的channel調用close將是一個編譯錯誤。
這是改進的版本,這一次參數使用了單方向channel類型:
gopl.io/ch8/pipeline3
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
調用counter(naturals)將導致將chan int
類型的naturals隱式地轉換爲chan<- int
類型隻發送型的channel。調用printer(squares)也會導致相似的隱式轉換,這一次是轉換爲<-chan int
類型隻接收型的channel。任何雙向channel向單向channel變量的賦值操作都將導致該隱式轉換。這里併沒有反向轉換的語法:也就是不能一個將類似chan<- int
類型的單向型的channel轉換爲chan int
類型的雙向型的channel。
8.4.4. 帶緩存的Channels
帶緩存的Channel內部持有一個元素隊列。隊列的最大容量是在調用make函數創建channel時通過第二個參數指定的。下面的語句創建了一個可以持有三個字符串元素的帶緩存Channel。圖8.2是ch變量對應的channel的圖形表示形式。
ch = make(chan string, 3)
向緩存Channel的發送操作就是向內部緩存隊列的尾部插入原因,接收操作則是從隊列的頭部刪除元素。如果內部緩存隊列是滿的,那麽發送操作將阻塞直到因另一個goroutine執行接收操作而釋放了新的隊列空間。相反,如果channel是空的,接收操作將阻塞直到有另一個goroutine執行發送操作而向隊列插入元素。
我們可以在無阻塞的情況下連續向新創建的channel發送三個值:
ch <- "A"
ch <- "B"
ch <- "C"
此刻,channel的內部緩存隊列將是滿的(圖8.3),如果有第四個發送操作將發生阻塞。
如果我們接收一個值,
fmt.Println(<-ch) // "A"
那麽channel的緩存隊列將不是滿的也不是空的(圖8.4),因此對該channel執行的發送或接收操作都不會發送阻塞。通過這種方式,channel的緩存隊列解耦了接收和發送的goroutine。
在某些特殊情況下,程序可能需要知道channel內部緩存的容量,可以用內置的cap函數獲取:
fmt.Println(cap(ch)) // "3"
同樣,對於內置的len函數,如果傳入的是channel,那麽將返迴channel內部緩存隊列中有效元素的個數。因爲在併發程序中該信息會隨着接收操作而失效,但是它對某些故障診斷和性能優化會有幫助。
fmt.Println(len(ch)) // "2"
在繼續執行兩次接收操作後channel內部的緩存隊列將又成爲空的,如果有第四個接收操作將發生阻塞:
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
在這個例子中,發送和接收操作都發生在同一個goroutine中,但是在眞是的程序中它們一般由不同的goroutine執行。Go語言新手有時候會將一個帶緩存的channel當作同一個goroutine中的隊列使用,雖然語法看似簡單,但實際上這是一個錯誤。Channel和goroutine的調度器機製是緊密相連的,一個發送操作——或許是整個程序——可能會永遠阻塞。如果你隻是需要一個簡單的隊列,使用slice就可以了。
下面的例子展示了一個使用了帶緩存channel的應用。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不同的地理位置。它們分别將收到的響應發送到帶緩存channel,最後接收者隻接收第一個收到的響應,也就是最快的那個響應。因此mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應之前就返迴了結果。(順便説一下,多個goroutines併發地向同一個channel發送數據,或從同一個channel接收數據都是常見的用法。)
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
如果我們使用了無緩存的channel,那麽兩個慢的goroutines將會因爲沒有人接收而被永遠卡住。這種情況,稱爲goroutines洩漏,這將是一個BUG。和垃圾變量不同,洩漏的goroutines併不會被自動迴收,因此確保每個不再需要的goroutine能正常退出是重要的。
關於無緩存或帶緩存channels之間的選擇,或者是帶緩存channels的容量大小的選擇,都可能影響程序的正確性。無緩存channel更強地保證了每個發送操作與相應的同步接收操作;但是對於帶緩存channel,這些操作是解耦的。同樣,卽使我們知道將要發送到一個channel的信息的數量上限,創建一個對應容量大小帶緩存channel也是不現實的,因爲這要求在執行任何接收操作之前緩存所有已經發送的值。如果未能分配足夠的緩衝將導致程序死鎖。
Channel的緩存也可能影響程序的性能。想象一家蛋糕店有三個廚師,一個烘焙,一個上醣衣,還有一個將每個蛋糕傳遞到它下一個廚師在生産線。在狹小的廚房空間環境,每個廚師在完成蛋糕後必鬚等待下一個廚師已經準備好接受它;這類似於在一個無緩存的channel上進行溝通。
如果在每個廚師之間有一個放置一個蛋糕的額外空間,那麽每個廚師就可以將一個完成的蛋糕臨時放在那里而馬上進入下一個蛋糕在製作中;這類似於將channel的緩存隊列的容量設置爲1。隻要每個廚師的平均工作效率相近,那麽其中大部分的傳輸工作將是迅速的,個體之間細小的效率差異將在交接過程中瀰補。如果廚師之間有更大的額外空間——也是就更大容量的緩存隊列——將可以在不停止生産線的前提下消除更大的效率波動,例如一個廚師可以短暫地休息,然後在加快趕上進度而不影響其其他人。
另一方面,如果生産線的前期階段一直快於後續階段,那麽它們之間的緩存在大部分時間都將是滿的。相反,如果後續階段比前期階段更快,那麽它們之間的緩存在大部分時間都將是空的。對於這類場景,額外的緩存併沒有帶來任何好處。
生産線的隱喻對於理解channels和goroutines的工作機製是很有幫助的。例如,如果第二階段是需要精心製作的複雜操作,一個廚師可能無法跟上第一個廚師的進度,或者是無法滿足第階段廚師的需求。要解決這個問題,我們可以雇傭另一個廚師來幫助完成第二階段的工作,他執行相同的任務但是獨立工作。這類似於基於相同的channels創建另一個獨立的goroutine。
我們沒有太多的空間展示全部細節,但是gopl.io/ch8/cake包模擬了這個蛋糕店,可以通過不同的參數調整。它還對上面提到的幾種場景提供對應的基準測試(§11.4) 。