位元詩人 [Golang] 程式設計教學:撰寫共時性 (Concurrency) 程式

Golang共時性
Facebook Twitter LinkedIn LINE Skype EverNote GMail Yahoo Email

前言

由於 CPU 的時脈已經到物理上限,現在的硬體都往多核心、多 CPU 發展。同樣地,單一的大型伺服器相當昂貴,而且擴充量有限,使用多台主機組成的叢集 (cluster) 則相對易於擴充。然而,若程式碼沒有使用共時性 (concurrency) 的特性來撰寫,則無法真正發揮平行處理 (parallel computing) 所帶來的效能提升。

Go 主要的特色之一,就在於其對共時性程式的支援;大部分程式語言以函式庫來支援共時性程式,但 Go 將其內建在語法中。Go 的並時性程式有兩種,一種是以 CSP (communicating sequential processes) 模型的並時性程式,一種是傳統的多執行緒 (multi-thread) 程式。由於 Go 將 CSP 模型內建在語法中,通常建議使用這些內建功能來寫共時性程式。

goroutine 是輕量級執行緒 (lightweight thread)

大部分的程式語言,像是 C++ 或 Java 等,以執行緒 (thread) 做為並行程式的單位。Go 程式以 goroutine 做為並行執行的程式碼區塊,goroutine 類似於執行緒,但更輕量,一次啟動數百甚至數千個以上的 goroutine 也不會占用太多記憶體。要使用 goroutine,在函式前加上 go 關鍵字即可。以下為使用 goroutine 的實例:

package main

import (
    "log"
    "os"
    "sync"
)

func main() {
    // A goroutine-safe console printer.
    logger := log.New(os.Stdout, "", 0)

    // Sync between goroutines.
    var wg sync.WaitGroup

    // Add goroutine 1.
    wg.Add(1)
    go func() {
        defer wg.Done()
        logger.Println("Print from goroutine 1")
    }()

    // Add goroutine 2.
    wg.Add(1)
    go func() {
        defer wg.Done()
        logger.Println("Print from goroutine 2")
    }()

    // Add goroutine 3.
    wg.Add(1)
    go func() {
        defer wg.Done()
        logger.Println("Print from goroutine 3")
    }()

    logger.Println("Print from main")

    // Wait all goroutines.
    wg.Wait()
}

在本例中,我們建立了三個 goroutine。由於 goroutine 和主程式並時執行,若我們沒有使用 WaitGroup 將程式同步化,本程式在主程式結束時即提早結束,因此,我們宣告 wg 變數,在程式尾端等待所有 goroutine 執行結束。

如果讀者多執行幾次本程式,會發現每次印出字串的順序不同。並時性程式和傳統的循序式程式的思維不太一樣,執行並時性程式時無法保證程式運行的先後順序,需注意。

利用 channel 在 goroutine 間傳遞資料

上述的 goroutine 內的資料是各自獨立的,而 Go 用 channel 在不同並行程式間傳遞資料。如下例:

package main

import "fmt"

func main() {
    // Create a channel
    message := make(chan string)

    // Init a goroutine.
    go func() {
        // Send some data into the channel.
        message <- "Hello from channel"
    }()

    // Receive the data from the channel.
    msg := <-message
    fmt.Println(msg)
}

由於通道在傳輸時,會阻塞 (blocking) 程式的行進,在此處,我們不需要另外設置 WaitGroup。

設置固定大小的 buffered channel

前述的 channel 是無緩衝的。我們也可以設置有緩衝的 (buffered) channel,buffered channel 有固定的大小,這樣就不需等待其他的 goroutine,可以直接傳送資料。

package main

import (
    "log"
    "os"
    "sync"
)

func main() {
    // A goroutine-safe console printer.
    logger := log.New(os.Stdout, "", 0)

    // Sync among all goroutines.
    var wg sync.WaitGroup

    // Make a buffered channel.
    ch := make(chan int, 10)

    for i := 1; i <= 10; i++ {
        ch <- i
        wg.Add(1)
        go func() {
            defer wg.Done()
            logger.Println("Print from goroutine ", <-ch)
        }()
    }

    logger.Println("Print from main")
    wg.Wait()
}

指定 channel 的方向

我們在設置 channel 時,可指定其方向,如下例:

package main

import "fmt"

func ping(pings chan<- string, msg string) {
    pings <- msg
}

func pong(pings <-chan string, pongs chan<- string) {
    msg := <-pings
    pongs <- msg
}

func main() {
    pings := make(chan string, 1)
    pongs := make(chan string, 1)
    ping(pings, "passed message")
    pong(pings, pongs)
    fmt.Println(<-pongs)
}

關閉 channel

若不用 channel 時,可用 close 函式將 channel 關閉,如下例:

package main

import "fmt"

func main() {
    ch := make(chan int, 4)
    ch <- 2
    ch <- 4
    close(ch)
    // ch <- 6 // panic, send on closed channel

    fmt.Println(<-ch)
    fmt.Println(<-ch)
    fmt.Println(<-ch) // closed, returns zero value for element
}

使用 select 敘述在多個 channel 間做選擇

透過 select,我們可以在多個 channel 中做選擇。如下例:

package main

import "time"
import "fmt"

func main() {
    // For our example we'll select across two channels.
    c1 := make(chan string)
    c2 := make(chan string)

    // Each channel will receive a value after some amount
    // of time, to simulate e.g. blocking RPC operations
    // executing in concurrent goroutines.
    go func() {
        time.Sleep(time.Second * 1)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(time.Second * 1)
        c2 <- "two"
    }()

    // We'll use `select` to await both of these values
    // simultaneously, printing each one as it arrives.
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}

利用 channel 撰寫 generator

利用 channel 可以撰寫共時執行的 generator,如下例:

package main

import (
    "fmt"
    "strings"
)

func main() {
    data := []string{
        "The yellow fish swims slowly in the water",
        "The brown dog barks loudly after a drink from its water bowl",
        "The dark bird of prey lands on a small tree after hunting for fish",
    }

    histogram := make(map[string]int)
    wordsCh := make(chan string)

    go func() {
        defer close(wordsCh)

        for _, line := range data {
            words := strings.Split(line, " ")

            for _, word := range words {
                word = strings.ToLower(word)
                wordsCh <- word
            }
        }
    }()

    for {
        word, opened := <- wordsCh
        if !opened {
            break
        }
        histogram[word]++
    }

    for k, v := range histogram {
        fmt.Println(fmt.Sprintf("%s\t(%d)", k, v))
    }
}

利用 mutex 將共時性程式同步化

除了前述的 goroutine 和 channel 外,Go 也提供較傳統的 Mutex。在共時性程式中,mutex 會將某一段程式暫時鎖住,避免兩個共時程式競爭同一塊資料。以下範例節錄自一個假想的向量類別:

package vector

import (
    "sync"
)

type IVector interface {
    Len() int
    GetAt(int) float64
    SetAt(int, float64)
}

type Vector struct {
    sync.RWMutex
    vec []float64
}

func New(args ...float64) IVector {
    v := new(Vector)
    v.vec = make([]float64, len(args))

    for i, e := range args {
        v.SetAt(i, e)

    }

    return v
}

// The length of the vector
func (v *Vector) Len() int {
    return len(v.vec)
}

// Getter
func (v *Vector) GetAt(i int) float64 {
    if i < 0 || i >= v.Len() {
        panic("Index out of range")
    }

    return v.vec[i]
}

// Setter
func (v *Vector) SetAt(i int, data float64) {
    if i < 0 || i >= v.Len() {
        panic("Index out of range")
    }

    v.Lock()
    v.vec[i] = data
    v.Unlock()
}

/ Vector algebra delegating to function object.
// This method delegates vector algebra to function object set by users, making
// it faster then these methods relying on reflection.
func Apply(v1 IVector, v2 IVector, f func(float64, float64) float64) IVector {
    _len := v1.Len()

    if !(_len == v2.Len()) {
        panic("Unequal vector size")
    }

    out := WithSize(_len)

    var wg sync.WaitGroup

    for i := 0; i < _len; i++ {
        wg.Add(1)

        go func(v1 IVector, v2 IVector, out IVector, f func(float64, float64) float64, i int) {
            defer wg.Done()

            out.SetAt(i, f(v1.GetAt(i), v2.GetAt(i)))
        }(v1, v2, out, f, i)
    }

    wg.Wait()

    return out
}

當我們要將資料存入內部的 float64 切片時,透過 Mutex 將切片暫時鎖住,避免多個程式同時存取切片。在此情形外,資料都是各自獨立的,所以,可以開啟多個 goroutine 進行並行運算。

關於作者

身為資訊領域碩士,位元詩人 (ByteBard) 認為開發應用程式的目的是為社會帶來價值。如果在這個過程中該軟體能成為永續經營的項目,那就是開發者和使用者雙贏的局面。

位元詩人喜歡用開源技術來解決各式各樣的問題,但必要時對專有技術也不排斥。閒暇之餘,位元詩人將所學寫成文章,放在這個網站上和大家分享。