8.5. 併發的循環

本節中,我們會探索一些用來在併行時循環迭代的常見併發模型。我們會探究從全尺寸圖片生成一些縮略圖的問題。gopl.io/ch8/thumbnail包提供了ImageFile函數來幫我們拉伸圖片。我們不會説明這個函數的實現,隻需要從gopl.io下載它。

gopl.io/ch8/thumbnail ```go package thumbnail

// ImageFile reads an image from infile and writes // a thumbnail-size version of it in the same directory. // It returns the generated file name, e.g., “foo.thumb.jpg”. func ImageFile(infile string) (string, error) ```

下面的程序會循環迭代一些圖片文件名,併爲每一張圖片生成一個縮略圖:

gopl.io/ch8/thumbnail go // makeThumbnails makes thumbnails of the specified files. func makeThumbnails(filenames []string) { for _, f := range filenames { if _, err := thumbnail.ImageFile(f); err != nil { log.Println(err) } } }

顯然我們處理文件的順序無關緊要,因爲每一個圖片的拉伸操作和其它圖片的處理操作都是彼此獨立的。像這種子問題都是完全彼此獨立的問題被叫做易併行問題(譯註:embarrassingly parallel,直譯的話更像是尷尬併行)。易併行問題是最容易被實現成併行的一類問題(廢話),併且是最能夠享受併發帶來的好處,能夠隨着併行的規模線性地擴展。

下面讓我們併行地執行這些操作,從而將文件IO的延遲隱藏掉,併用上多核cpu的計算能力來拉伸圖像。我們的第一個併發程序隻是使用了一個go關鍵字。這里我們先忽略掉錯誤,之後再進行處理。

// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
    for _, f := range filenames {
        go thumbnail.ImageFile(f) // NOTE: ignoring errors
    }
}

這個版本運行的實在有點太快,實際上,由於它比最早的版本使用的時間要短得多,卽使當文件名的slice中隻包含有一個元素。這就有點奇怪了,如果程序沒有併發執行的話,那爲什麽一個併發的版本還是要快呢?答案其實是makeThumbnails在它還沒有完成工作之前就已經返迴了。它啟動了所有的goroutine,沒一個文件名對應一個,但沒有等待它們一直到執行完畢。

沒有什麽直接的辦法能夠等待goroutine完成,但是我們可以改變goroutine里的代碼讓其能夠將完成情況報告給外部的goroutine知曉,使用的方式是向一個共享的channel中發送事件。因爲我們已經知道內部的goroutine隻有len(filenames),所以外部的goroutine隻需要在返迴之前對這些事件計數。

// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
    ch := make(chan struct{})
    for _, f := range filenames {
        go func(f string) {
            thumbnail.ImageFile(f) // NOTE: ignoring errors
            ch <- struct{}{}
        }(f)
    }
    // Wait for goroutines to complete.
    for range filenames {
        <-ch
    }
}

註意我們將f的值作爲一個顯式的變量傳給了函數,而不是在循環的閉包中聲明:

for _, f := range filenames {
    go func() {
        thumbnail.ImageFile(f) // NOTE: incorrect!
        // ...
    }()
}

迴憶一下之前在5.6.1節中,匿名函數中的循環變量快照問題。上面這個單獨的變量f是被所有的匿名函數值所共享,且會被連續的循環迭代所更新的。當新的goroutine開始執行字面函數時,for循環可能已經更新了f併且開始了另一輪的迭代或者(更有可能的)已經結束了整個循環,所以當這些goroutine開始讀取f的值時,它們所看到的值已經是slice的最後一個元素了。顯式地添加這個參數,我們能夠確保使用的f是當go語句執行時的“當前”那個f。

如果我們想要從每一個worker goroutine往主goroutine中返迴值時該怎麽辦呢?當我們調用thumbnail.ImageFile創建文件失敗的時候,它會返迴一個錯誤。下一個版本的makeThumbnails會返迴其在做拉伸操作時接收到的第一個錯誤:

// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
    errors := make(chan error)

    for _, f := range filenames {
        go func(f string) {
            _, err := thumbnail.ImageFile(f)
            errors <- err
        }(f)
    }

    for range filenames {
        if err := <-errors; err != nil {
            return err // NOTE: incorrect: goroutine leak!
        }
    }

    return nil
}

這個程序有一個微秒的bug。當它遇到第一個非nil的error時會直接將error返迴到調用方,使得沒有一個goroutine去排空errors channel。這樣剩下的worker goroutine在向這個channel中發送值時,都會永遠地阻塞下去,併且永遠都不會退出。這種情況叫做goroutine洩露(§8.4.4),可能會導致整個程序卡住或者跑出out of memory的錯誤。

最簡單的解決辦法就是用一個具有合適大小的buffered channel,這樣這些worker goroutine向channel中發送測向時就不會被阻塞。(一個可選的解決辦法是創建一個另外的goroutine,當main goroutine返迴第一個錯誤的同時去排空channel)

下一個版本的makeThumbnails使用了一個buffered channel來返迴生成的圖片文件的名字,附帶生成時的錯誤。

// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
    type item struct {
        thumbfile string
        err       error
    }

    ch := make(chan item, len(filenames))
    for _, f := range filenames {
        go func(f string) {
            var it item
            it.thumbfile, it.err = thumbnail.ImageFile(f)
            ch <- it
        }(f)
    }

    for range filenames {
        it := <-ch
        if it.err != nil {
            return nil, it.err
        }
        thumbfiles = append(thumbfiles, it.thumbfile)
    }

    return thumbfiles, nil
}

我們最後一個版本的makeThumbnails返迴了新文件們的大小總計數(bytes)。和前面的版本都不一樣的一點是我們在這個版本里沒有把文件名放在slice里,而是通過一個string的channel傳過來,所以我們無法對循環的次數進行預測。

爲了知道最後一個goroutine什麽時候結束(最後一個結束併不一定是最後一個開始),我們需要一個遞增的計數器,在每一個goroutine啟動時加一,在goroutine退出時減一。這需要一種特殊的計數器,這個計數器需要在多個goroutine操作時做到安全併且提供提供在其減爲零之前一直等待的一種方法。這種計數類型被稱爲sync.WaitGroup,下面的代碼就用到了這種方法:

// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

註意Add和Done方法的不對策。Add是爲計數器加一,必須在worker goroutine開始之前調用,而不是在goroutine中;否則的話我們沒辦法確定Add是在”closer” goroutine調用Wait之前被調用。併且Add還有一個參數,但Done卻沒有任何參數;其實它和Add(-1)是等價的。我們使用defer來確保計數器卽使是在出錯的情況下依然能夠正確地被減掉。上面的程序代碼結構是當我們使用併發循環,但又不知道迭代次數時很通常而且很地道的寫法。

sizes channel攜帶了每一個文件的大小到main goroutine,在main goroutine中使用了range loop來計算總和。觀察一下我們是怎樣創建一個closer goroutine,併讓其等待worker們在關閉掉sizes channel之前退出的。兩步操作:wait和close,必須是基於sizes的循環的併發。考慮一下另一種方案:如果等待操作被放在了main goroutine中,在循環之前,這樣的話就永遠都不會結束了,如果在循環之後,那麽又變成了不可達的部分,因爲沒有任何東西去關閉這個channel,這個循環就永遠都不會終止。

圖8.5 表明了makethumbnails6函數中事件的序列。縱列表示goroutine。窄線段代表sleep,粗線段代表活動。斜線箭頭代表用來同步兩個goroutine的事件。時間向下流動。註意main goroutine是如何大部分的時間被喚醒執行其range循環,等待worker發送值或者closer來關閉channel的。

練習 8.4: 脩改reverb2服務器,在每一個連接中使用sync.WaitGroup來計數活躍的echo goroutine。當計數減爲零時,關閉TCP連接的寫入,像練習8.3中一樣。驗證一下你的脩改版netcat3客戶端會一直等待所有的併發“喊叫”完成,卽使是在標準輸入流已經關閉的情況下。

練習 8.5: 使用一個已有的CPU綁定的順序程序,比如在3.3節中我們寫的Mandelbrot程序或者3.2節中的3-D surface計算程序,併將他們的主循環改爲併發形式,使用channel來進行通信。在多核計算機上這個程序得到了多少速度上的改進?使用多少個goroutine是最合適的呢?