並行性

通信による共有

並行プログラミングは大きなトピックであり、ここではGo特有のハイライトのためのスペースしかありません。

多くの環境での並行プログラミングは、共有変数への正しいアクセスを実装するために必要な微妙さによって困難になります。Goは異なるアプローチを推奨し、共有値をチャネルで受け渡し、実際には別々の実行スレッドによって能動的に共有されることはありません。任意の時点で値にアクセスできるのは1つのゴルーチンのみです。設計上、データ競合は発生しません。この考え方を促進するために、スローガンに要約しています:

メモリを共有することで通信しないでください。代わりに、通信することでメモリを共有してください。

このアプローチは行き過ぎることもあります。参照カウントは、たとえば、整数変数の周りにミューテックスを置くことで最もよく行われるかもしれません。しかし、高レベルなアプローチとして、チャネルを使用してアクセスを制御することで、明確で正しいプログラムを書きやすくします。

このモデルについて考える一つの方法は、1つのCPUで実行される典型的な単一スレッドプログラムを考えることです。同期プリミティブは必要ありません。今度は、もう一つのそのようなインスタンスを実行します。それも同期は必要ありません。今、この2つを通信させます。通信が同期装置であれば、他の同期の必要はまだありません。たとえば、Unixパイプラインはこのモデルに完全に適合します。GoのConcurrencyアプローチはHoareのCommunicating Sequential Processes(CSP)に由来しますが、Unixパイプの型安全な汎化とも見ることができます。

ゴルーチン

それらはゴルーチンと呼ばれます。既存の用語(スレッド、コルーチン、プロセスなど)は不正確な含意を伝えるからです。ゴルーチンには単純なモデルがあります:同じアドレス空間内の他のゴルーチンと同時に実行される関数です。軽量で、スタック空間の割り当て以上のコストはほとんどかかりません。そして、スタックは小さく始まるので、安価で、必要に応じてヒープストレージを割り当て(および解放)することで成長します。

ゴルーチンは複数のOSスレッドに多重化されるため、1つがI/Oを待っているなどでブロックされても、他が実行を続けます。その設計は、スレッドの作成と管理の複雑さの多くを隠します。

関数またはメソッド呼び出しの前にgoキーワードを付けて、新しいゴルーチンで呼び出しを実行します。呼び出しが完了すると、ゴルーチンは静かに終了します。(効果はUnixシェルの&記法でコマンドをバックグラウンドで実行するのと似ています。)

go list.Sort()  // list.Sortを並行して実行し、待機しません

関数リテラルは、ゴルーチン呼び出しで便利になることがあります。

func Announce(message string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println(message)
    }()  // 括弧に注意 - 関数を呼び出す必要があります
}

Goでは、関数リテラルはクロージャです:実装は、関数によって参照される変数がアクティブである限り存続することを保証します。

これらの例はあまり実用的ではありません。関数が完了を知らせる方法がないからです。そのためには、チャネルが必要です。

チャネル

マップと同様に、チャネルはmakeで割り当てられ、結果の値は基になるデータ構造への参照として機能します。オプションの整数パラメータが提供されると、チャネルのバッファサイズを設定します。デフォルトはゼロで、バッファなしまたは同期チャネル用です。

ci := make(chan int)            // バッファなしの整数チャネル
cj := make(chan int, 0)         // バッファなしの整数チャネル
cs := make(chan *os.File, 100)  // Fileへのポインタのバッファ付きチャネル

バッファなしチャネルは通信(値の交換)と同期(2つの計算(ゴルーチン)が既知の状態にあることを保証)を組み合わせます。

チャネルを使用する多くのいいイディオムがあります。始めるための1つを次に示します。前のセクションでは、バックグラウンドでソートを開始しました。チャネルによって、開始しているゴルーチンがソートの完了を待つことができます。

c := make(chan int)  // チャネルを割り当てます
// ゴルーチンでソートを開始します。完了したら、チャネルで信号を送ります
go func() {
    list.Sort()
    c <- 1  // 信号を送ります。値は重要ではありません
}()
doSomethingForAWhile()
<-c   // ソートの完了を待ちます。送信された値は破棄します

レシーバーは受信するデータがあるまで常にブロックします。チャネルがバッファなしの場合、送信者はレシーバーが値を受信するまでブロックします。チャネルにバッファがある場合、送信者は値がバッファにコピーされるまでのみブロックします。バッファがいっぱいの場合、これは一部のレシーバーが値を取得するまで待つことを意味します。

バッファ付きチャネルは、たとえば、スループットを制限するためのセマフォのように使用できます。この例では、着信リクエストはhandleに渡され、チャネルに値を送信し、リクエストを処理してから、チャネルから値を受信して、次のコンシューマーのために「セマフォ」を準備します。チャネルバッファの容量は、processへの同時呼び出し数を制限します。

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // アクティブキューの排出を待ちます
    process(r)  // 時間がかかる場合があります
    <-sem       // 完了。次のリクエストの実行を有効にします
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // handleの完了を待ちません
    }
}

MaxOutstandingハンドラがprocessを実行すると、それ以上はいっぱいになったチャネルバッファに送信しようとしてブロックされ、既存のハンドラの1つが完了してバッファから受信するまで続きます。

ただし、この設計には問題があります:Serveは着信するすべてのリクエストに対して新しいゴルーチンを作成しますが、MaxOutstanding個のみが任意の瞬間に実行できます。結果として、リクエストが速すぎると、プログラムは無制限のリソースを消費する可能性があります。Serveを変更してゴルーチンの作成をゲートすることで、この欠陥に対処できます:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}

(Go 1.22以前のバージョンでは、このコードにはバグがあることに注意してください:ループ変数はすべてのゴルーチン間で共有されます。詳細についてはGo wikiを参照してください。)

リソースをうまく管理する別のアプローチは、リクエストチャネルからすべて読み取る固定数のhandleゴルーチンを開始することです。ゴルーチンの数はprocessへの同時呼び出し数を制限します。このServe関数は、終了するように指示されるチャネルも受け入れます。ゴルーチンを起動した後、そのチャネルからの受信をブロックします。

func handle(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve(clientRequests chan *Request, quit chan bool) {
    // ハンドラを開始します
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests)
    }
    <-quit  // 終了するように指示されるまで待ちます
}

チャネルのチャネル

Goの最も重要な性質の1つは、チャネルが第一級の値であり、他の値と同様に割り当てられ、受け渡されることです。この性質の一般的な使用は、安全で並列な逆多重化を実装することです。

前のセクションの例では、handleはリクエストの理想化されたハンドラでしたが、処理していた型を定義していませんでした。その型が応答するチャネルを含む場合、各クライアントは回答のための独自のパスを提供できます。型Requestのスキーマ的定義を次に示します。

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}

クライアントは関数とその引数、およびリクエストオブジェクト内の回答を受信するチャネルを提供します。

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// リクエストを送信します
clientRequests <- request
// レスポンスを待ちます
fmt.Printf("answer: %d\n", <-request.resultChan)

サーバー側では、ハンドラ関数のみが変更されます。

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

明らかに、これを現実的にするためにはもっと多くのことが必要ですが、このコードは、レート制限された、並列な、非ブロッキングRPCシステムのフレームワークであり、ミューテックスは見当たりません。

並列化

これらのアイデアのもう一つの応用は、複数のCPUコアにわたって計算を並列化することです。計算が独立に実行できる別々の部分に分割できる場合、それは並列化でき、各部分が完了したときに合図するチャネルで並列化できます。

高価な演算をアイテムのベクトルで実行し、各アイテムでの演算の値が独立している、この理想化された例で言いましょう。

type Vector []float64

// v[i]、v[i+1] ... v[n-1]まで演算を適用します
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1    // この部分が完了したことを合図します
}

ループで部分を独立に起動し、CPUごとに1つずつ。それらは任意の順序で完了できますが、重要ではありません。すべてのゴルーチンを起動した後、チャネルを排出して完了信号をカウントします。

const numCPU = 4 // CPUコア数

func (v Vector) DoAll(u Vector) {
    c := make(chan int, numCPU)  // バッファリングはオプションですが賢明です
    for i := 0; i < numCPU; i++ {
        go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
    }
    // チャネルを排出します
    for i := 0; i < numCPU; i++ {
        <-c    // 1つのタスクの完了を待ちます
    }
    // すべて完了
}

numCPUの定数値を作成するのではなく、ランタイムに適切な値を尋ねることができます。関数runtime.NumCPUはマシン内のハードウェアCPUコアの数を返すため、次のように書くことができます:

var numCPU = runtime.NumCPU()

また、関数runtime.GOMAXPROCSもあり、Goプログラムが同時に実行できるユーザー指定のコア数を報告(または設定)します。デフォルトはruntime.NumCPUの値ですが、同様に名前付けされたシェル環境変数を設定するか、正の数で関数を呼び出すことでオーバーライドできます。ゼロで呼び出すと値をクエリするだけです。したがって、ユーザーのリソース要求を尊重したい場合は、次のように書くべきです:

var numCPU = runtime.GOMAXPROCS(0)

並行性(独立して実行されるコンポーネントとしてプログラムを構造化すること)と並列性(効率のために複数のCPUで並列に計算を実行すること)のアイデアを混同しないように注意してください。Goの並行性機能により、一部の問題を並列計算として構造化しやすくなりますが、Goは並列言語ではなく並行言語であり、すべての並列化問題がGoのモデルに適合するわけではありません。違いについての議論については、このブログ投稿で引用されている講演を参照してください。

リーキーバッファ

並行プログラミングのツールは、非並行のアイデアでさえ表現しやすくすることができます。RPCパッケージから抽象化された例を次に示します。クライアントゴルーチンは、おそらくネットワークなどのソースからデータを受信してループします。バッファの割り当てと解放を避けるために、フリーリストを保持し、バッファ付きチャネルを使用してそれを表現します。チャネルが空の場合、新しいバッファが割り当てられます。メッセージバッファの準備ができると、serverChanでサーバーに送信されます。

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
    for {
        var b *Buffer
        // 利用可能であればバッファを取得し、そうでなければ割り当てます
        select {
        case b = <-freeList:
            // 1つ取得しました。もう何もすることはありません
        default:
            // 空いているものがないので、新しいものを割り当てます
            b = new(Buffer)
        }
        load(b)              // ネットから次のメッセージを読み取ります
        serverChan <- b      // サーバーに送信します
    }
}

サーバーループは各メッセージをクライアントから受信し、処理して、バッファをフリーリストに戻します。

func server() {
    for {
        b := <-serverChan    // 作業を待ちます
        process(b)
        // 余裕があればバッファを再利用します
        select {
        case freeList <- b:
            // バッファがフリーリストに。もう何もすることはありません
        default:
            // フリーリストがいっぱいです。そのまま続けます
        }
    }
}

クライアントはfreeListからバッファを取得しようとします。利用できない場合、新しいものを割り当てます。サーバーのfreeListへの送信は、リストがいっぱいでない限りbをフリーリストに戻し、その場合バッファはガベージコレクタによって回収される床に落とされます。(selectステートメントのdefault句は、他のケースが準備されていないときに実行されます。つまり、selectは決してブロックしません。)この実装は、バッファ付きチャネルとガベージコレクタに依存した簿記でわずか数行でリーキーバケット フリーリストを構築します。