[Golang] Goroutine を支える技術

2020-08-05

この記事は Go7 Advent Calendar 2019 の 23日目の記事です。

今回は Go言語による並行処理 の内容を理解するためのアウトプットです 🐥 もう少し見直してから公開しようと思ってたんですが、23日目の枠が偶然にも空いていたのでもう公開してしまいます😈

これで4枠目です。ごめんなさい、そしてありがとう。

久々に結構長めなので余裕のある方だけどうぞ。

目次

Goroutine

Goroutine(ゴルーチン) は 並行処理を扱うためのコルーチンです。 普通のコルーチンとは違って処理の割り込みや再開を開発者がプログラム上から制御することはできません。

ゴルーチンはM:Nモデルと呼ばれる複数(N)のカーネルスレッドに 複数(M)のユーザスレッドを対応させたものにスケジューリングされるため、 複数のCPUコアを扱うことができます。

各カーネルスレッドは ワークデック (work deque) と呼ばれる両端から取り出し可能なキューを持ちます。 以下のように働きます。

  • 新たにゴルーチンが発行されると自身のワークデックの最後尾に追加する

  • スレッドがアイドル状態のとき、他のワークデックの前側からゴルーチンを半分奪って自身のデックに追加する

  • 処理がゴルーチンの合流地点に達したら自身のワークデックの最後尾からゴルーチンを取り出して実行する

    • 最後尾に追加したタスクは親ゴルーチンの合流を完了させるために必要になることが多く、早く合流させることはプログラムの高速化につながる

    • もし自身のワークデックが空なら、アイドル状態になるか他のワークデックの前側からゴルーチンを半分奪って自身のデックに追加する

これはワークスティーリングと呼ばれる仕組みです。

ゴルーチン自体はとても軽量(一つにつき数KB)なため、8GB RAMで約300万、512GB RAM では 約2億のゴルーチンを生成することができ、 OSスレッドに比べて90%以上起動が速いとされています(出典: Go言語による並行処理)

Golang はこの Goroutine を生成するための go 文や、 ゴルーチン間で(チャネルを介して)通信を行うためのアロー演算子を構文として持ちます。

go statement

go 文の後に関数呼び出しを書くとゴルーチンが作られ、関数はそこで実行されます。

Goのプロセスは最低でも一つのゴルーチン(すべての基底となるメインゴルーチン)を持ち、 それから派生するようにほかのゴルーチンが作られます。

基本的にゴルーチンは同期しない限り生成 のゴルーチンの状態に依らず処理が完了するまで突き進みますが、 メインゴルーチン(プロセス自身)が終了したら、生成されたゴルーチンも終了します。

以下は親子(2対)のゴルーチンに対して指定した秒数スリープしたあとにプリントするプログラムです。

goroutine.go
package main

import (
	"fmt"
	"os"
	"strconv"
	"time"
)

func main() {
	go func() {
		d, _ := strconv.Atoi(os.Args[2])
		time.Sleep(time.Second * time.Duration(d))
		fmt.Println("child", d)
	}()
	d, _ := strconv.Atoi(os.Args[1])
	time.Sleep(time.Second * time.Duration(d))
	fmt.Println("parent", d)
}
# 親が後に終わるようにスリープ
$ go run goroutine.go 2 1
child 1
parent 2

# 親が先に終わるようにスリープ
$ go run goroutine.go 1 2
parent 1

Address space

ゴルーチンは生成元のゴルーチンとアドレス空間を共有します 🤼‍♀️

nesting.go
package main

import (
	"fmt"
	"time"
)

var a = 100

func main() {
	b := 200
	go func() {
		fmt.Printf("a:%d (%p) b:%d (%p) in child\n", a, &a, b, &b)
		a += 1
		b += 1

		go func() {
			fmt.Printf("a:%d (%p) b:%d (%p) in grandchild\n", a, &a, b, &b)
			a += 2
			b += 2
		}()
	}()
	time.Sleep(time.Second * 1)
	fmt.Printf("a:%d (%p) b:%d (%p) in parent\n", a, &a, b, &b)
}
$ go run nesting.go
a:100 (0x1167280) b:200 (0xc00008a008) in child
a:101 (0x1167280) b:201 (0xc00008a008) in grandchild
a:103 (0x1167280) b:203 (0xc00008a008) in parent

グローバルスコープも(外側の)関数スコープも、生成されたゴルーチンからは読み書き可能なのです。

ただし、 for 文の中でゴルーチンを発行するときは注意が必要です。

以下のプログラムはいずれもループ内でゴルーチンを発行しています。

gofor.go
package main

import (
	"fmt"
	"time"
)

func main() {
	for i := 0; i < 3; i++ {
		go func() {
			fmt.Println("i in goroutine:", i)
		}()
		fmt.Println("i:", i)
	}
	time.Sleep(1 * time.Second)
}

goforarg.go
package main

import (
	"fmt"
	"time"
)

func main() {
	for i := 0; i < 3; i++ {
		go func(j int) {
			fmt.Println("i in goroutine:", j)
		}(i)
		fmt.Println("i:", i)
	}
	time.Sleep(1 * time.Second)
}
$ go run gofor.go
i: 0
i: 1
i: 2
i in goroutine: 3
i in goroutine: 3
i in goroutine: 3
$ go run goforarg.go
i: 0
i: 1
i: 2
i in goroutine: 0
i in goroutine: 2
i in goroutine: 1

右側のプログラムのように 0, 1, 2 が一度ずつ出現してほしかったんですが、 左側のプログラムはすべて 3 になってしまいました 😵

ゴルーチンの生成が終わる頃には i がループの終了条件に達し、 生成されたすべてのゴルーチンが 同じ i を参照してしまうため、このようなことが起こります。 とはいえタイミングの問題でもあるので状況によっては同じ結果にはならないかもしれません。

右側の実装例のようにゴルーチン内で参照する変数を引数として渡したり、 後述するチャネルを使って渡すことでこのような問題を回避できます🤸🏻‍♀️

Channel

チャネルは ゴルーチン間でデータを通信するための機能です。 Golang ではゴルーチンのデータのやり取りは このチャネルを介して行うことが強く推奨されています。

備考

CSP (Communicating Sequential Processes) と呼ばれる並行処理を表現するためのパラダイムがあります。

CSP は独立したプロセス間でデータのやり取りをするための記述表現を提供します。 ここでいうプロセスは並行処理の概念で、Golang でいうなら Goroutine が該当します。

CSP におけるメッセージパッシングを Golang で実現するための部品が チャネル です。 実際、CSPではプロセスの入出力は言語のプリミティブと考える必要があるということで、 Golangのチャネルはまさにそれを体現しています。

(詳しく知りたい方はリンク先を参照したりググったりしてください)

チャネルは「型」「方向」「バッファ」を持ちます。

  • チャネルでやり取りできるのはプリミティブな型だけでなく、構造体やスライス、マップのような複合的な型もやり取り可能です。

  • 方向は左向きの矢印 (<-) で表現し、 送信の場合はチャネルの 右側 、 受信の場合はチャネルの 左側 に矢印を置きます。右向きの矢印はありません。

チャネルは chan という型で表され、実際に作るときは make 関数を用います。 make 関数には チャネルの型バッファサイズ を指定します。

バッファサイズとはいわばキューの長さであり、「送信されたデータをいくつ保持しておくか」を表します。 送信されたデータがバッファに収まらず、他のゴルーチンから受信されていないと処理がブロックされます。

チャネルのバッファサイズだけが異なる同じ内容のプログラムを2つ書いてみました。

channel.go
package main

import (
	"fmt"
)

func add(ch chan []int, i int) {
	s := <-ch
	ch <- append(s, i)
}

func main() {
	ch := make(chan []int)

	go add(ch, 3)
	ch <- []int{1, 2}

	fmt.Println(<-ch)
}

channel2.go
package main

import (
	"fmt"
)

func add(ch chan []int, i int) {
	s := <-ch
	ch <- append(s, i)
}

func main() {
	ch := make(chan []int, 1)

	go add(ch, 3)
	ch <- []int{1, 2}

	fmt.Println(<-ch)
}
$ go run channel.go
[1 2 3]
$ go run channel2.go
[1 2]

何がこの違いを生み出しているかというと、「チャネルを読み出すタイミング」です。 これらのプログラムでチャネルを読み出しているのは s := <- ch, fmt.Println(<-ch) の2箇所です。

左のプログラムではaddゴルーチン内の s := <-ch が先に実行されます。 バッファサイズが 0 のため、addゴルーチンでチャネルを読み出すまで ch <- []int{1, 2} でブロックされ続けます。 その後 addゴルーチンで書き込まれた [1 2 3]fmt.Println(<-ch) が読み出して描画するというわけです。

右のプログラムではmainゴルーチン内の fmt.Println(<-ch) が先に実行されます。 バッファサイズが 1 のため、 ch <- []int{1, 2} はブロックされずに、 fmt.Println(<-ch) まで届き、直前で書き込んだ [1 2] を描画します。 addゴルーチンはブロックされたままです。 正確には fmt.Println(<-ch) が先に実行されるのはあくまでタイミング的なものであって、保証されているわけではありません。

このようにチャネルのバッファサイズを変えるだけで処理順が変わり 私達の思惑を裏切る結果になることがあるため、チャネルのブロックを当てにして実行制御を組むときは注意が必要です。

ちなみに最初のプログラムでは time.Sleep を使って実行順を調整していましたが、 プログラムの実行時間は端末のスペックや状況によって変動するため、これも良い方法とは言えません。

備考

上記で使ったのは両方向のチャネルでしたが、送信専用、受信専用のチャネル型を定義できます。

大抵の場合、仮引数の型として「送信専用」または「受信専用」なチャネル型を定義し、その関数内でだけ強制するために利用します。

make 関数を使って送信(あるいは受信)専用のチャネルを作ることはできますが、 プログラム全体を通して「送信だけ」「受信だけ」しかできないチャネルにはほとんど存在意義がありません。

スタブとして使えるくらいかな?

もしかして他にも意味があるかもしれないと思っていろいろ調べてみましたが、 make で作った専用チャネルを有効に活用している文献やコードは見つけられませんでした。 特別な使い方があるよって方は教えて下さい。

close

チャネルは close 関数によって閉じることができます。

閉じられたチャネルからデータを読み込むことはできますが 書き込むと(仮にバッファが空いていても)パニックが発生します。

channelclose.go
package main

import (
	"fmt"
)

func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	i, more := <-ch
	fmt.Println(i, more)
	close(ch)
	i, more = <-ch
	fmt.Println(i, more)
	i, more = <-ch
	fmt.Println(i, more)
	ch <- 3
}
$ go run channelclose.go
1 true
2 true
0 false
panic: send on closed channel

goroutine 1 [running]:
main.main()
        channelclose.go:18 +0x2e9
exit status 2

通常、バッファにデータがない状態でチャネルからデータを読み出すとブロックされますが、 クローズされたチャネルからデータを読み出すとブロックされずにチャネルのゼロ値が得られます。

この値がバッファから読み出された値かどうかを区別するためには、第2返却値(上記でいうと ok)を見ればわかります。

さて、気になるのは「チャネルのクローズを忘れるとメモリリークを引き起こすか」ということでしょう。 結論から言うとチャネルはGCの対象なのでメモリリークの直接の原因には繋がりません。

クローズされたチャネルの読み込みはブロックされないので、 そのチャネルが原因でゴルーチンをゾンビ化させることはありません。

select

Go言語による並行処理 によれば select文はチャネルをまとめる糊だと言っています。 チャネルが読み書き可能になるのを待って処理を分岐しようとしたら select文は欠かせません。

select文は switch文のように caseにハンドリングしたいチャネルを列挙し、 読み書き可能になったチャネルのcase文に対応する処理を実行します。

以下は別々に数値を合計したゴルーチンから値を受け取り、 総乗用のゴルーチンに投げて受け取るプログラムです。

(1 + 2 + 3 + 4) * (5 + 6 + 7 + 8 + 9) * (10 + 11) = 7350 が期待する結果です。

select.go
package main

import (
	"fmt"
	"time"
)

func sum(receiver chan int, nums ...int) {
	total := 0
	for _, num := range nums {
		time.Sleep(1 * time.Second)
		total += num
	}
	receiver <- total
}

func product(receiver chan int, sender chan []int) {
	// 6秒待ってからチャネルからレシーブする
	time.Sleep(6 * time.Second)
	total := 1
	for _, num := range <-sender {
		total *= num
	}
	receiver <- total
}

func main() {
	receiver1 := make(chan int)
	receiver2 := make(chan int)
	receiver3 := make(chan int)
	receiver4 := make(chan int)
	sender := make(chan []int)

	go sum(receiver1, 1, 2, 3, 4)
	go sum(receiver2, 5, 6, 7, 8, 9)
	go sum(receiver3, 10, 11)
	go product(receiver4, sender)

	numbers := make([]int, 0)

	for n := 5; n > 0; {
		select {
		case total := <-receiver1:
			fmt.Println("sum1:", total)
			numbers = append(numbers, total)
			n--
		case total := <-receiver2:
			fmt.Println("sum2:", total)
			numbers = append(numbers, total)
			n--
		case total := <-receiver3:
			fmt.Println("sum3:", total)
			numbers = append(numbers, total)
			n--
		case sender <- numbers:
			n--
		case total := <-receiver4:
			fmt.Println("product:", total)
			n--
		}
	}
}
$ go run select.go
sum3: 21
sum1: 10
sum2: 35
product: 7350

main関数内のチャネル操作はすべて select にまとめました。

チャネルの対する操作は読み書きどちらも select で待ち受けられますが、 一つのcase文に指定できるチャネルは一つだけなので注意してください。 つまり、複数のチャネルの状態を見て処理を分岐したい場合は別途状態管理用の変数が必要になってきます。

今回はすべての処理を合流させるのに単純に数値の減算(n--)で表現しています。

sync.Mutex

Mutex は並行処理で最も基本的なリソース制御、ロック(Lock)を行うためのものです。

ロック状態のMutexにたいして、ロックを取得しようとするとその処理はブロックされます。

Mutex の状態は「ロックされている」か「ロックされていない」かのいずれかで、 メソッドは Lock(), Unlock() しかありません。

すでにおわかりの通り、 Lock() でロックを取得、 Unlock() で解放します。 Unlock() は defer で後処理として呼び出すことが多いです。

Mutex を使う場合と使わない場合に分けて、足し算の結果を比較してみましょう。

100 + 1 + 2 + 3106 になるのが期待する結果です。

nomutex.go
package main

import (
	"fmt"
)

func main() {
	a := 100
	go func() {
		a += 1
		go func() {
			a += 2
		}()
		go func() {
			a += 3
		}()
	}()
	fmt.Println(a)

}

mutex.go
package main

import (
	"fmt"
	"sync"
)

func main() {
	mu1 := sync.Mutex{}
	mu1.Lock()
	mu2 := sync.Mutex{}
	mu2.Lock()
	a := 100
	go func() {
		a += 1
		go func() {
			defer mu1.Unlock()
			a += 2
		}()
		go func() {
			defer mu2.Unlock()
			a += 3
		}()
	}()
	mu1.Lock()
	mu2.Lock()
	fmt.Println(a)
	mu1.Unlock()
	mu2.Unlock()
}
$ go run nomutex.go
100
$ go run mutex.go
106

ロックを使わないコードではゴルーチンが実行される前に fmt.Println が実行され、 初期値の 100 が表示されてしまいました。 (タイミングの問題なので先に実行される保証はないですが大半はこのような結果になるでしょう)

ロックを使ったコードは少々冗長になりましたが、結果は期待どおりです。

さて、この例では処理を合流する目的で Mutex を用いていますが、 本来 Mutex はリソースの排他制御に用いるのに適しています。

ゴルーチンで処理を合流させるためには 後述する sync.WaitGrouperrgroup.Group を用いるべきです。

sync.WaitGroup

WaitGroup は足し算と引き算の世界です。 Add(delta int) で内部のカウンタを増やし、 Done() でカウンタをデクリメントします。 (DoneはAddに-1を指定して呼び出してるだけです)

Done は Mutex.Unlock と同じように defer で後処理として呼び出すのがよいでしょう。

合流を待つのは Wait() で、内部のカウンタが 1 以上だと処理をブロックします。減算を忘れると処理が終了しないので注意が必要です。

waitgroup.go
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(2)
	a := 100
	go func() {
		a += 1
		go func() {
			defer wg.Done()
			a += 2
		}()
		go func() {
			defer wg.Done()
			a += 3
		}()
	}()
	wg.Wait()
	fmt.Println(a)
}
$ go run waitgroup.go
106

合流を待つゴルーチンの分だけロックを生成しなくてもよいので、Mutexよりだいぶスッキリしましたね。 (Mutex ディスではなく適材適所という話です)

備考

本来、処理の同期には sync パッケージを使って行うべきですが、 当記事ではわかりやすさを考慮して、このセクション以外では time.Sleep を使って同期します。

errgroup.Group

errgroup.Group も WaitGroup と同じように処理を合流させるものですが、 利用者目線で見ると非同期処理を実行する際にgo文を明示的に呼び出さないという点が大きくことなります。

とりあえず、サンプルプログラムとして ページングを要する一覧結果を求めるAPIを高速化するというシナリオで、 それぞれ5秒かかる「一覧」と「ページング情報」を別々に取得するプログラムを作ってみます。

検索を実装するのはダルいのでかなり手抜きです。 左側が errgroup を使って非同期処理したもので、右側が思考停止で同期的に処理したものとなってます。

egtest/async.go
package main

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/sync/errgroup"
)

const pageSize = 5

func main() {
	current := 2
	eg, _ := errgroup.WithContext(context.Background())
	var result []string
	var total int
	eg.Go(func() error {
		result = list()
		return nil
	})
	eg.Go(func() error {
		total = count()
		return nil
	})
	if err := eg.Wait(); err != nil {
		fmt.Errorf("なにかエラー: %w", err)
		return
	}
	fmt.Println(map[string]interface{}{
		"result":      result,
		"total":       total,
		"hasPrevious": current > 1,
		"hasNext":     total > current*pageSize,
	})
}

func list() []string {
	time.Sleep(time.Second * 5)
	return []string{"a", "b", "c", "d", "e"}
}

func count() int {
	time.Sleep(time.Second * 5)
	return 11
}

egtest/sync.go
package main

import (
	"fmt"
	"time"
)

const pageSize = 5

func main() {
	current := 2
	result := list()
	total := count()
	fmt.Println(map[string]interface{}{
		"result":      result,
		"total":       total,
		"hasPrevious": current > 1,
		"hasNext":     total > current*pageSize,
	})
}

func list() []string {
	time.Sleep(time.Second * 5)
	return []string{"a", "b", "c", "d", "e"}
}

func count() int {
	time.Sleep(time.Second * 5)
	return 11
}
$ time go run async.go
map[hasNext:true hasPrevious:true result:[a b c d e] total:11]
go run async.go  0.22s user 0.30s system 7% cpu 6.634 total
$ time go run sync.go
map[hasNext:true hasPrevious:true result:[a b c d e] total:11]
go run sync.go  0.25s user 0.33s system 4% cpu 11.777 total

非同期実行のほうが約5秒早くなり無事高速化を果たしました。(ビルド時間があるのでちょっと遅いですね...)

errgroup.Group の Go メソッドは 「エラーを返却する引数なしの関数」を指定することで Wait メソッドで合流するまで非同期で実行されます。

Waitは 複数の非同期タスクがあるとき、すべてが成功していれば(errを返さなければ) nil となり、 どれか一つがエラーであればそのエラーが返却されます。

少し errgroup の実装(errgroup.go) について掘り下げてみると errgroup.Group は 前述した WaitGroup をラップして使いやすくしたものと言えます。

Goメソッド内では WaitGroup のカウンタの増減と新規ゴルーチンの発行が行われます。 発行されたゴルーチン内で渡された関数を実行し、エラーが発生するとそのエラーを記録してから cancelメソッドを呼び出します。 (このエラーハンドリングは一度だけ行われる)

ここでいうcancelメソッドは WithContext が発行したコンテキスト構造体の cancel メソッドに紐づくので エラーが発生するとこのコンテキストを渡している処理全てにキャンセルが伝播します。(今回はめんどくさかったのでブランク変数に捨ててしまったw)

※コンテキストについてはこの後のセクションを参照

Wait メソッドは裏側で WaitGroup.Wait メソッドを呼び出すことで処理のブロッキングを実現しています。 最後に内部で記録した err フィールドを返却しておしまいです。

context.Context

context は並行処理間で状態を伝達(伝播)するためのパッケージです。

コンテキストは次のように初期化できます。

ctx := context.Background()

具体的に伝播するものは キャンセル(状態) です。 With から始まるメソッドに ctx を与えることで、 parent に ctx を持つ新たなコンテキストを作成します。

Value

Context に値を持たせるには context.WithValue (コード) 関数を用います。 WithValue 関数は ctx と key, value を受け取り、新しい ctx を返却します。

Context 一つに対し key と value は一組しか持てないため、階層化することで複数の値を管理します。 つまり、複数の値をもたせたい場合は、その回数分 WithValue メソッドを呼び出して階層化するということになります。

コンテキストから値を抽出する場合は、 自身のコンテキストを基底として親コンテキストを再帰的に遡り、該当するキーをもつコンテキストを探すのです。 最後まで見つからなければ nil が返却されます。

contextvaluepropagation.go
package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	ctx := context.Background()
	// ctx をベースに aCtx を作り、a に 1 をもたせる
	aCtx := context.WithValue(ctx, "a", 1)
	go func(ctx context.Context) {
		go func(ctx context.Context) {
			fmt.Println("a1:", ctx.Value("a"), ctx.Value("b"), &ctx)
		}(ctx)

		// aCtx をベースに aCtx2 を作り、 a に 2 をもたせる
		aCtx2 := context.WithValue(ctx, "a", 2)
		go func(ctx context.Context) {
			fmt.Println("a2:", ctx.Value("a"), ctx.Value("b"), &ctx)
		}(aCtx2)

		// aCtx2 をベースに aCtx3 を作り、 a に 3 をもたせる
		aCtx3 := context.WithValue(aCtx2, "a", 3)
		go func(ctx context.Context) {
			fmt.Println("a3:", ctx.Value("a"), ctx.Value("b"), &ctx)
		}(aCtx3)
	}(aCtx)

	// aCtx をベースに bCtx を作り b に 10 をもたせる
	bCtx := context.WithValue(aCtx, "b", 10)
	go func(ctx context.Context) {
		go func(ctx context.Context) {
			fmt.Println("b1:", ctx.Value("a"), ctx.Value("b"), &ctx)
		}(ctx)

		// bCtx をベースに bCtx2 を作り b に 20 をもたせる
		bCtx2 := context.WithValue(ctx, "b", 20)
		go func(ctx context.Context) {
			fmt.Println("b2:", ctx.Value("a"), ctx.Value("b"), &ctx)
		}(bCtx2)

		// bCtx2 をベースに bCtx3 を作り b に 30 をもたせる
		bCtx3 := context.WithValue(bCtx2, "b", 30)
		// bCtx2 をベースに bCtx4 を作り b に 40 をもたせる
		bCtx4 := context.WithValue(bCtx2, "b", 40)
		go func(ctx context.Context) {
			fmt.Println("b3:", ctx.Value("a"), ctx.Value("b"), &ctx)
		}(bCtx3)
		go func(ctx context.Context) {
			fmt.Println("b4:", ctx.Value("a"), ctx.Value("b"), &ctx)
		}(bCtx4)
	}(bCtx)
	time.Sleep(1 * time.Second)
}
$ go run contextvaluepropagation.go
a3: 3 <nil> 0xc000010020
a1: 1 <nil> 0xc0000641f0
a2: 2 <nil> 0xc000064200
b1: 1 10 0xc0000aa000
b2: 1 20 0xc000064210
b3: 1 30 0xc0000aa010
b4: 1 40 0xc0000b4000

このプログラムのコンテキストの階層を俯瞰するとこんな感じになるでしょう。

ctx
aCtx a = 1
aCtx2 a = 2
aCtx3 a = 3
bCtx b = 10
bCtx2 b = 20
bCtx3 b = 30
bCtx4 b = 40

たとえば、 bCtx3 に対して a のキーを参照しようとすると、遡った直近の aCtx が持つ 1 が取れます。

WithValue は新たな ctx を返却するため、 一度作った ctx の値が別の処理などで書き換えられる心配がありません。

逆に子ゴルーチンの値をコンテキスト経由で親ゴルーチンに渡すといったことはできません。 コンテキストの用途を「共有」ではなく「伝達」と書いたのはこのような性質からです。

子から親にデータを渡す場合はチャネル か map 等の参照型の変数を使います。

備考

並列環境下で map の同じキーを同時に書き換えるとパニックが発生します。

mapconflict.go
package main

import (
	"fmt"
	"time"
)

func keepRewriting(m map[string]string, key string, value string) {
	for {
		m[key] = value
	}
}

func main() {
	m := make(map[string]string)
	go keepRewriting(m, "a", "100")
	go keepRewriting(m, "a", "200")
	time.Sleep(5 * time.Second)
	fmt.Println("map:", m)
}
$ go run mapconflict.go
fatal error: concurrent map writes

goroutine 7 [running]:
runtime.throw(0x10d2c83, 0x15)
      /go/src/runtime/panic.go:774 +0x72 fp=0xc000036f18 sp=0xc000036ee8 pc=0x1029562
runtime.mapassign_faststr(0x10b2360, 0xc000064180, 0x10d049b, 0x1, 0x0)
      /gosrc/runtime/map_faststr.go:211 +0x417 fp=0xc000036f80 sp=0xc000036f18 pc=0x1010467
main.keepWriting(0xc000064180, 0x10d049b, 0x1, 0x10d052d, 0x3)
      mapconflict.go:10 +0x4b fp=0xc000036fb8 sp=0xc000036f80 pc=0x10997db
runtime.goexit()
      /go/src/runtime/asm_amd64.s:1357 +0x1 fp=0xc000036fc0 sp=0xc000036fb8 pc=0x10535b1
created by main.main
      mapconflict.go:17 +0xc2

goroutine 1 [sleep]:
runtime.goparkunlock(...)
      /go/src/runtime/proc.go:310
time.Sleep(0x12a05f200)
      /go/src/runtime/time.go:105 +0x157
main.main()
      mapconflict.go:18 +0xd5

goroutine 6 [runnable]:
main.keepWriting(0xc000064180, 0x10d049b, 0x1, 0x10d0527, 0x3)
      mapconflict.go:10 +0x4b
created by main.main
      mapconflict.go:16 +0x76
exit status 2

(1コア環境では再現しません)

複数のゴルーチンで map を書き換える場合は ロックをかけるか、 sync.Map を使いましょう。

maplock.go
package main

import (
	"fmt"
	"sync"
	"time"
)

type Map struct {
	v   map[string]string
	mux sync.Mutex
}

func (m *Map) set(key string, value string) {
	m.mux.Lock()
	m.v[key] = value
	m.mux.Unlock()
}

func keepRewriting(m *Map, key string, value string) {
	for {
		m.set(key, value)
	}
}

func main() {
	m := &Map{v: make(map[string]string), mux: sync.Mutex{}}
	go keepRewriting(m, "a", "100")
	go keepRewriting(m, "a", "200")
	time.Sleep(5 * time.Second)
	fmt.Println("map:", m)
}

syncmap.go
package main

import (
	"fmt"
	"sync"
	"time"
)

func keepRewriting(m *sync.Map, key string, value string) {
	for {
		m.Store(key, value)
	}
}

func main() {
	m := &sync.Map{}
	go keepRewriting(m, "a", "100")
	go keepRewriting(m, "a", "200")
	time.Sleep(5 * time.Second)
	fmt.Println("map:", m)
}
$ go run maplock.go
map: &{map[a:100] {3 0}}
$ go run syncmap.go
map: &{{2 1} {{map[] true}} map[a:0xc00000e010] 0}

WithValue 関数の key 引数にはどんな型でも指定できます。 今回は単純化するためキーには string型 を指定しましたが、衝突を避けるために独自の型を使うことが推奨されています。

type ctxKey string
ctx = context.WithValue(ctx, ctxKey("color"), "black")
ctx.Value(ctxKey("color"))

Cancel

キャンセル状態を手動で伝播するための仕組みです。

Cancel を伝播するコンテキストは context.WithCancel (コード) で作成します。

WithCancel は 第一引数に ctx を受け取って新しい ctx と cancel の組を返却します。 任意のタイミングで cancel() を呼び出すと context.Done() がチャネルとして読み出し可能になります。

この性質を利用してキャンセルされるまで実行されたくない処理をブロックします。

contextcancel.go
package main

import (
	"context"
	"fmt"
	"time"
)

func gor1(ctx context.Context) {
	<-ctx.Done()
	fmt.Println("gor1 done")
}

func gor2(ctx context.Context) {
	<-ctx.Done()
	fmt.Println("gor2 done")
}

func main() {
	fmt.Println("started")
	defer fmt.Println("finished")
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)

	go gor1(ctx)
	go gor2(ctx)
	time.Sleep(1 * time.Second)
	cancel()
	time.Sleep(1 * time.Second)
}
$ go run contextcancel.go
started
gor2 done # どちらが先に実行されるかは不定
gor1 done # どちらが先に実行されるかは不定
finished

Timeout / Deadline

時間によって自動でキャンセル状態を伝播するためのコンテキストです。 context.WithTimeout (コード) で作成したコンテキストは指定した時間(Duration)経過によりキャンセルが発火し、 context.WithDeadline (コード) で作成したコンテキストは指定した時刻(Time)になったらキャンセルが発火します。

ちなみに context.WithTimeout は現在時刻に Duration を加えて context.WithDeadline を呼び出しているだけですが、 WithTimeout を使うことが多いでしょう。

Timeout

Deadline

contexttimeout.go
package main

import (
	"context"
	"fmt"
	"time"
)

func gor1(ctx context.Context) {
	<-ctx.Done()
	fmt.Println("gor1 done")
}

func gor2(ctx context.Context) {
	<-ctx.Done()
	fmt.Println("gor2 done")
}

func main() {
	fmt.Println("started")
	defer fmt.Println("finished")
	ctx := context.Background()
	ctx, _ = context.WithTimeout(ctx, 1*time.Second)

	go gor1(ctx)
	go gor2(ctx)
	time.Sleep(2 * time.Second)
}

contextdeadline.go
package main

import (
	"context"
	"fmt"
	"time"
)

func gor1(ctx context.Context) {
	<-ctx.Done()
	fmt.Println("gor1 done")
}

func gor2(ctx context.Context) {
	<-ctx.Done()
	fmt.Println("gor2 done")
}

func main() {
	fmt.Println("started")
	defer fmt.Println("finished")
	ctx := context.Background()
	ctx, _ = context.WithDeadline(ctx, time.Now().Add(1*time.Second))

	go gor1(ctx)
	go gor2(ctx)
	time.Sleep(2 * time.Second)
}
$ go run contexttimeout.go
started
gor1 done # どちらが先に実行されるかは不定
gor2 done # どちらが先に実行されるかは不定
finished
$ go run contextdeadline.go
started
gor1 done # どちらが先に実行されるかは不定
gor2 done # どちらが先に実行されるかは不定
finished

Cancel propagation

Value のセクションにて コンテキストは階層構造を持つと言いましたが、 値だけでなくキャンセル状態も伝播されます。

以下のプログラムはコンテキストを階層構造にしてキャンセル状態を伝播しています。

階層は次のようになっており、 ctx1 に紐づく cancel1 を呼び出しているため、 ctx1 と、それを親に持つ ctx1a でブロックしている処理だけが実行されてほしいです。

  • ctx0

    • ctx1, cancel1

      • ctx1a

    • ctx2, _

    • ctx3, _

contextcancelpropagation.go
package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	ctx0 := context.Background()

	go func() {
		ctx1, cancel1 := context.WithCancel(ctx0)
		cancel1()
		go func() {
			ctx1a := context.WithValue(ctx1, "a", 1)
			go func() {
				<-ctx1a.Done()
				fmt.Println("1a finished")
			}()
			<-ctx1.Done()
			fmt.Println("1 finished")
		}()

		ctx2, _ := context.WithCancel(ctx0)
		go func() {
			<-ctx2.Done()
			fmt.Println("2 finished")
		}()

		go func() {
			<-ctx0.Done()
			fmt.Println("3 finished")
		}()
	}()

	time.Sleep(1 * time.Second)
}
$ go run contextcancelpropagation.go
1 finished # どちらが先に実行されるかは不定
1a finished # どちらが先に実行されるかは不定

期待通りになりましたね。 ctx2, ctx3 は ctx1 とは兄弟関係にありますがキャンセルは実行されません。 ctx1aWithValue で作られましたが、親が ctx1 なのでキャンセル状態を受け取ることができました。

Memory leak

ゴルーチンは GC の対象でないため、終了しないままゾンビ化することがあります。

いくらゴルーチンが軽いとはいってもプロセスが長時間起動し、大量のゾンビゴルーチンが取り残されれば メモリ使用量はじわじわと増加していき、いずれは OOM Killer に殺されてしまうことでしょう。

チャネルの読み書きや sync パッケージのメソッドたちが、処理をブロックしてしまうのが主な原因ですが 処理が終わったゴルーチンがブロックされていても気づきにくいので、これは意外と難しい問題です。

runtime/pprof パッケージの pprof.Lookup("goroutine") とすることで稼働中のゴルーチンを取得できるのでこれをつかって監視してみることにします。

以下は(終わらない)ゴルーチンを0.5秒間隔で生成し、別ゴルーチンにて1秒間隔で個数をカウントするプログラムです。

pprof.go
package main

import (
	"log"
	"os"
	"runtime/pprof"
	"time"
)

func main() {
	log.SetFlags(log.Ltime | log.LUTC)
	log.SetOutput(os.Stdout)

	go func() {
		goroutines := pprof.Lookup("goroutine")
		for range time.Tick(1 * time.Second) {
			log.Printf("goroutine count: %d\n", goroutines.Count())
		}
	}()

	var blockForever chan struct{}
	for i := 0; i < 10; i++ {
		go func() { <-blockForever }()
		time.Sleep(500 * time.Millisecond)
	}
}
$ go run pprof.go
00:00:00 goroutine count: 4
00:00:01 goroutine count: 6
00:00:02 goroutine count: 8
00:00:03 goroutine count: 10
00:00:04 goroutine count: 12

このようにゴルーチン件数を一定周期で監視しログを出力したり、 異常な件数のときにアラートを送るようにもできるでしょう。

以上です。 概ね気になっていたことは書けたと思います。

間違いがあったら指摘お願いします 🙇

Go言語による並行処理
ゴルーチンの仕組みから利用方法に至るまで幅広く書かれています。
全部目を通しましたがまだ全部は理解できてません。もう少しパイプラインのセクションを理解したい。
詳しく知りたい方は一読をおすすめします。