Go の channel 処理パターン集
この記事は Go Advent Calendar 2017 の 1 日目の記事です。
Go の長所に goroutine による非同期処理がありますが、どうしても channel の取り回しで黒魔術化しがちです。少しでも闇を減らしていきたいので、 channel らへんの取り回しについてパターンをまとめました。チートシート的に使えれば嬉しいです。
Go の channel の基礎
入門資料として使いたいので、本題に入る前にざっくり基礎を。
定義のパターン
channel には capacity という概念があります。 capacity は channel 内でバッファリングしておける容量のことで、 capacity に空きが無い場合は送信側が処理待ち(ブロック)します。
capacity なしの定義(int 型で capacity 0)
ch := make(chan int)
capacity ありの定義(int 型で capacity 10)
ch := make(chan int, 10)
close
channel は close によって閉じることができます。受信側で検知できるので、受信側に終了を伝えるために使います。
close(ch)
送信のパターン
channel で定義した型の値を投入することができます。前述の通り、 capacity に余裕がなければ受信されるまでブロックします。
ch <- 1
受信のパターン
受信の書き方は 6 パターンあります。
受信、もしくは close したことだけ検知するパターン。非同期処理の終了を待つ場合に使うことが多いです。
<-ch
値を取得するパターン。 close された場合はゼロ値が入ります
v := <-ch
値が入ったのか close されたのかを判別するパターン。 ok が false なら close です。
v, ok := <-ch
if !ok {
// closed
}
select-case で待ち受けるパターン。順序は保証されませんが複数 channel を同時に待ち受けることができます。
select {
case v := <-ch1:
// ch 1
case v := <-ch2:
// ch 2
}
default をつけると既に値が入っている channel がなければ待たずに default が実行されます。
select {
case v := <-ch1:
// ch 1
case v := <-ch2:
// ch 2
default:
// default
}
for-range で処理することもできます。 close されたらループを抜けます。
for v := range ch {
// do something
}
// closed
使うときの主な注意点
踏みがちな注意点です。これらを回避するために黒魔術が生まれがちです。
- close した channel に event を送ると panic する
- 多重に close しても panic する
- 受信が先にいなくなると block し続ける
close 済みな channel を操作すると無視するのではなくしっかり panic するのが Go らしいところです。曖昧さの排除。
よくあるアンチパターン
実用的なパターン集に入る前に、見かけがちなアンチパターンを紹介しておきます。ブロックし続ける可能性があったり、 panic する可能性があったりします。
終了を受信側に伝えるのに終了 event を送る
シンプルな処理なら上手く動かせると思いますが、受信側が終了用 channel 以外の何かしらで待ちを抜けた場合に迷子になります。
func A(done chan struct{}) {
// do something
done <- struct{}{}
}
func B() {
done := make(chan struct{})
go A(done)
// do something
// この辺で return や panic したら A の goroutine が受け取り側がなく待ち続ける
<-done
}
close させるのが良いですね。
func A(done chan struct{}) {
// do something
close(done)
}
func B() {
done := make(chan struct{})
go A(done)
// do something
<-done
}
たまに select-case でこういうコードを見かけたりもしますが、前述の通りブロックし続ける可能性があります。
func worker(jobChan chan Job, done chan struct{}) {
for {
select {
case j := <-jobChan:
// do something
case <-done:
return
}
}
}
func B(jobs []Job) {
jobChan := make(chan Job)
done := make(chan struct{})
go worker(jobChan, done)
for _, j := range jobs {
jobChan <- j
}
done <- struct{}{}
}
上記ようにシンプルな処理なら、出来る限り for-range を使っていくのがおすすめです。 close されると抜けてくれます。サンプルは以下。
func worker(jobChan chan Job) {
for j := range jobChan {
// do something
}
}
func B(jobs []Job) {
jobChan := make(chan Job)
defer close(jobChan)
go worker(jobChan)
for _, j := range jobs {
jobChan <- j
}
}
close される可能性のある channel を第一戻り値だけで受ける
select で待ち受ける場合にうっかりやりがちです。 close されると第一戻り値はゼロ値になるので、想定外の挙動になる可能性があります。
func A() {
select {
case v := <-valueORCloseChan:
// do something with v
// close された場合は vがゼロ値
case v := <-otherChan:
// do something
}
}
ok で判定して分岐させます。
func A() {
select {
case v, ok := <-valueORCloseChan:
if !ok {
// closed
}
// do something with v
case v := <-otherChan:
// do something
}
}
goroutine から複数 event が来る可能性があるのに始めのものを受けたら抜ける
error 用の channel を待つ場合にありがちです。始めのもの以外は迷子になります。特に capacity が 0 の channel の goroutine はずっとブロックし続けます。
func A(wg *sync.WaitGroup, errChan chan error) {
defer wg.Done()
if err := doSomething(); err != nil {
// ここが複数回発火すると詰まる
errChan <- err
}
}
func B() error {
wg := new(sync.WaitGroup)
errChan := make(chan error)
done := make(chan struct{})
wg.Add(2)
go A(wg, errChan)
go A(wg, errChan)
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case err := <-errchan:
return err
}
}
受け取るのは一つでいいのであれば、イベント送信側を sync.Once で 1 event しか送られないように統一するのが便利です。もし sync.Once を渡していくのが複雑になる場合は、そもそも設計が複雑すぎる方を疑いたい。
var once sync.Once
func A(wg *sync.WaitGroup, errChan chan error) {
defer wg.Done()
if err := doSomething(); err != nil {
once.Do(func() {
errChan <- err
})
}
}
func B() error {
wg := new(sync.WaitGroup)
errChan := make(chan error)
done := make(chan struct{})
wg.Add(2)
go A(wg, errChan)
go A(wg, errChan)
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case err := <-errchan:
return err
}
}
select の発火順は無保証なのでちょっと怪しいコードです。真面目にやるなら done ではなく errChan を close ですね。
受け取るのが error であれば errgroup がオススメです。 sync.WaitGroup のような使用感で、 context を使って一つ失敗したら他の処理を cancel する機構付きなので非常に便利。
イベント送信時に channel が closed か調べたい
複数の送信側処理があり、受信は一つで良い場合に欲しくなるやつ。たまに欲しくなります。
が、今のところ便利なインターフェースは提供されていません。
受信してみて第二戻り値が false なら closed ですが、受信すると第一戻り値を普通に受け取っちゃう場合もあるのでよろしくない。
一応こんな感じで実装してみましたが、アンチパターン臭がすごい。
func isClosed(ch chan Value) bool {
select {
case v, ok := <-ch:
if !ok {
return true
}
go func() {
ch <- v
}()
default:
}
return false
}
func A(ch chan Value) {
if !isClosed(ch) {
ch <- SomeValue()
}
}
上記のコードは lock を仕込んでおらず、チェック後に close されている可能性が残っているので実用的ではないです。 lock 処理まで入れるとがんじがらめな黒魔術になるので別の手段を取ったほうが良さそう。
高速に送りまくる
例えばシンプルな slice 処理に比べるとパフォーマンスがどうしても悪くなるので、高パフォーマンスが必要な箇所ではベンチを取って検証した方がいいです。
ちょっと雑ですが、シンプルに slice で入れ出しする場合と channel を経由して入れ出しする場合のベンチを取ってみました。
func BenchmarkWithSimpleSlice(b *testing.B) {
src := make([]int, 10000)
dst := make([]int, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// write
for i := range src {
dst[i] = src[i]
}
// read
for i := 0; i < len(src); i++ {
_ = dst[i]
}
}
}
func BenchmarkWithChannel(b *testing.B) {
src := make([]int, 10000)
ch := make(chan int, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// write
for i := range src {
ch <- src[i]
}
// read
for i := 0; i < len(src); i++ {
<-ch
}
}
}
結果が以下。
BenchmarkWithSimpleSlice 2000000 8991 ns/op
BenchmarkWithChannel 30000 580989 ns/op
570,000 ns/op
くらいの差が出ているので、 57 ns/item
くらいでしょうか。ベンチマークとしての妥当性はともかく、高パフォーマンスが必要なところではある程度の考慮が必要になりそうなことは伝わるかと思います。
channel で並列数管理する場合のパターン
ここから実用的なパターン集です。
並列数 10 だとこんな感じ。
func A(jobs []Job) {
sem := make(chan struct{}, 10)
for _, j := range jobs {
sem <- struct{}{}
go func() {
j.Run()
<-sem
}()
}
}
capacity の仕組みを使ってブロックさせることで並列数管理をしています。
worker 的な用途で使う場合、 queue 取得のタイミングに注意。 channel に入れる前に queue を取得すると持ったままブロックします。考慮するなら以下のような感じでしょうか。
func A(jobs chan Job) {
sem := make(chan struct{}, 10)
for {
sem <- struct{}{}
j, ok := <-jobs
if !ok {
break
}
go func() {
j.Run()
<-sem
}()
}
}
重み付けなどが必要になったら semaphore が便利です。
channel で処理の終了を待つパターン
シンプルに待つだけ。 close して使うのがおすすめです。ちなみに、値が要らない場合は struct{}
がサイズ 0 のデータなのでおすすめです。
func A(done chan struct{}) {
defer close(done)
// do something
}
func B() {
done := make(chan struct{})
go A(done)
// wait
<-done
}
channel の closed を検知するパターン
アンチパターンと言ったばかりですが、 close するだけの channel なら使い所があります。
特に、メソッド内で context の状態を確認するのにたまに使います。 cancel されているか確認するのに Done() で返ってくる chan が closed か判定したい。 select-case の default を使います。
var isDone bool
select {
case <-ctx.Done():
isDone = true
default:
isDone = false
}
こんな感じ。自分はメソッド化して使ったりします。
func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
たまに Done 検知のサンプルで default がないものがありますが、 select 文では default がないとどれかの channel からイベントが返るまでブロックするので注意です。
channel で繰り返し処理する場合のパターン集
基本的には 送信側を先に片付ける -> channel を close する -> 受信側が待ちを抜ける
の順番で処理するように意識するのが大事です。 select 待ち受けも使い所はありますが、なるべく for-range と defer を使っていく方がおすすめです。
送信側は 1 つ and 受信側は 1 つ
並列化するほどじゃないけど受け取り処理はさっさと済ませて遅延処理させたいシンプルなパターン。 channel の capacity が溢れない程度に受信側が上手いこと処理してくれている(間に合っている)場合はコレで足りる。
func worker(queue chan Job) {
for j := range queue {
// do something
}
}
func A() error {
queue := make(chan Job, 100)
defer close(queue)
go worker(queue)
for {
j, err := someHandle()
if err != nil {
return err
}
queue <- j
}
return nil
}
間に合わなくなってきたら次述のように受信側を増やしていきます。
送信側は 1 つ and 受信側は複数
受け取り側の処理が 1 つじゃ追いつかない場合によく書く worker スタイル。ただの goroutine 起動との違いは並列数の管理です。
送信側が 1 つなので、受信側が複数に増えても channel を close してあげれば問題なし。
func worker(queue chan Job) {
for j := range queue {
// do something
}
}
const workerNum = 10
func A() error {
queue := make(chan Job, 100)
defer close(queue)
for i := 0; i < workerNum; i++ {
go worker(queue)
}
for {
j, err := someHandle()
if err != nil {
return err
}
queue <- j
}
return nil
}
送信側は複数 and 受信側は 1 つ
並列処理の結果を収集したい場合によく書く。キャンセルにしろ正常終了にしろ、全ての送信処理が終わったことを確認してから受信側を抜ける必要がある。
送信処理の goroutine があといくつ残っているかを数えてあげて、全て片付いたら close してあげれば OK 。atomic 系で数えてあげてもいいけど、 sync.WaitGroup が好みです。
とあるページング的リクエストを分割して並列実行し、結果を配列に集めるパターン。
func someRequest(from, to uint, resultChan chan Result) {
results := request(from, to)
for _, r := range results {
resultChan <- r
}
}
func A() []Result {
resultChan := make(chan Result, 100)
wg := new(sync.WaitGroup)
for i := 0; i < 99; i++ {
wg.Add(1)
from := i * 100
to := (i + 1) * 100
go func() {
defer wg.Done()
someRequest(from, to, resultChan)
}()
}
go func() {
wg.Wait()
close(resultChan)
}()
results := make([]Result, 0, 10000)
for _, r := range resultChan {
results = append(results, r)
}
return results
}
各送信側にキャンセルを伝えたい場合は専用の channel を渡してあげてもいいけど、 context を渡すのが楽。外部コマンドや外部リクエストにもそのまま渡せるので。
func someRequest(ctx context.context, from, to uint, resultChan chan Result) {
results := request(ctx, from, to)
for _, r := range results {
resultChan <- r
}
}
func A(ctx context.Context) []Result {
resultChan := make(chan Result, 100)
wg := new(sync.WaitGroup)
for i := 0; i < 99; i++ {
wg.Add(1)
from := i * 100
to := (i + 1) * 100
go func() {
defer wg.Done()
someRequest(ctx, from, to, resultChan)
}()
}
go func() {
wg.Wait()
close(resultChan)
}()
results := make([]Result, 0, 10000)
for _, r := range resultChan {
results = append(results, r)
}
return results
}
送信側は複数 and 受信側は複数
Dispatcher - Worker 的な。 Dispatcher を独自に用意しているサンプルが散見されますが、型によって受け渡す worker を変えて云々、みたいな分岐が必要なければ channel 自体が dispatcher になります。
基本的には受信側が増える分には close を使えば問題なし。
func worker(queue chan Job) {
for j := range queue {
// do something
}
}
func someRequest(from, to uint, callbackJobQueue chan Job) {
results := request(from, to)
for _, r := range results {
callbackQueue <- NewJob(r)
}
}
const (
workerNum = 10
)
func A() {
queue := make(chan Job, 100)
defer close(queue)
wg := new(sync.WaitGroup)
for i := 0; i < workerNum; i++ {
go worker(queue)
}
for i := 0; i < 99; i++ {
wg.Add(1)
from := i * 100
to := (i + 1) * 100
go func() {
defer wg.Done()
someRequest(from, to, queue)
}()
}
wg.Wait()
}
結果を収集したければ worker に収集用の channel を渡せば OK 。
select 待ち受けについて
基本的には for-range をおすすめします。 select を使うのは以下のような場合かなーと。
- ループではなく一回だけ発火するやーつ
- 全て close 待ち
- どれかが close されたら処理を抜ける場合。とはいえ発火順序は保証されないので使い所があまり思いつかなかった
- 待ち受ける複数 channel のうち、一つしか返ってこないことが保証される場合
- 非同期処理で resolve か reject のどちらかだけ返ってくる場合など。
- 一個受けたらもう抜けちゃっていいやつ
- 例えば signal 処理。プロセスを終了するなら一個だけ受ければいい
- 全て close 待ち
- 抜ける気がないループ
- プロセスが死ぬまで動かし続けるので色々気にしないでいい場合
- ちゃんと送信側が片付いたことを全て確認できている
- 前述の送信側が複数あるパターンと同じく、 sync.WaitGroup などでちゃんと管理できている場合。ただし、 for-range と違い全ての channel が受信済みであることも保証しないと受信漏れが生じるのでややこしい。
- それぞれで for-range を組むほうがシンプルになりがち(個人の見解)
受信漏れの考慮に苦心しがちなので、 channel でループする場合は for-range に寄せていったほうが楽かなーと思っています。
timeout 処理や context.Done のハンドリングは悩ましいところですが、なるべく context の仕組みに頼り、どうしても必要なところは受信漏れを考慮した上で select 待ち受けするのがいいかと。 capacity なしの結果用 channel を返す上、 timeout 処理がちゃんと実装されていないメソッドがあったとしても、 context 対応でシンプルに使える wrapper を用意したいところ。
ちなみにありがちなミスとして、 select in for loop で close された channel があると暴走します。
func main() {
ch1 := make(chan struct{})
ch2 := make(chan struct{})
close(ch1)
for {
select {
case <- ch1:
log.Println("1")
case <- ch2:
log.Println("2")
}
}
}
上記のように select-case で closed な channel があるとひたすら発火して 1
が出力され続けます。多段の event 待ち受けで受信側から送信側に送信停止を伝播させたい場合にやりがち。
func A(ch1 chan Event, stopA chan struct{}) {
defer close(ch1)
t := time.NewTicker(3 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
ch1 <- handle()
case <-stopA:
return
}
}
}
func B(ch1 chan Event, ch2 chan Job, stopB chan struct{}, stopA chan struct{}) {
defer close(ch2)
for {
select {
case e, ok := <-ch1:
if !ok {
return
}
ch2 <- NewJob(e)
case <-stopB:
close(stopA)
}
}
}
func C(ch2 chan Job, stopB chan struct{}) {
for j := range ch2 {
if err := j.Run(); err != nil {
close(stopB)
}
}
}
上記のような Event を受けて Job を dispatch するコードだと、 B の stopB
の case が連続発火して stopA
を多重 close し、 panic します。
対処としては channel ごとに待ち受ける goroutine を分けて処理したり、 nil で上書きして case 判定から除外したりでしょうか? 個人的には設計が怪しくなっていると判断してリファクタしたくなります。
エラー収集(一つだけ拾えば OK )
errgroup が楽すぎるので使っていきましょう
エラー収集(複数集める)
for-range で各処理から error を集め、処理が全て終わったら close します。
func A() []error {
errChan := make(chan error, 100)
wg := new(sync.WaitGroup)
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := doSomething(); err != nil {
errChan <- err
}
}()
}
go func() {
wg.Wait()
close(errChan)
}()
errors := make([]error, 0, 100)
for r := range errChan {
errors = append(errors, r)
}
return errors
}
エラー収集(結果も集める)
結果と error をまとめた struct を用意すれば error のみと同じように書けます。
type Wrapper struct {
Result Result
Error error
}
func A() []Wrapper {
resultChan := make(chan Wrapper, 100)
wg := new(sync.WaitGroup)
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result, err := doSomething();
resultChan <- Wrapper{
Result: result,
Error: err,
}
}()
}
go func() {
wg.Wait()
close(resultChan)
}()
results := make([]Wrapper, 0, 100)
for r := range resultChan {
results = append(results, r)
}
return results
}
channel を増やして並べる書き方もできます。ただし、 capacity に注意しないとブロックします(以下の例だと errChan
を先に処理しているので、 resultChan
が飽和したら詰まる)。
func A() ([]Result, []error) {
errChan := make(chan error, 100)
resultChan := make(chan Result, 100)
wg := new(sync.WaitGroup)
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
result, err := doSomething();
if err != nil {
errChan <- err
return
}
resultChan <- result
}()
}
go func() {
wg.Wait()
close(errChan)
close(resultChan)
}()
errors := make([]error, 0, 100)
for r := range errChan {
errors = append(errors, r)
}
results := make([]Result, 0, 100)
for r := range resultChan {
results = append(results, r)
}
return results, errors
}
前者のほうが安全ですが、個人的には後続処理で結果と error を分離するなら capacity に気をつけながら後者でもいいかなーと思っています。
以上
とりあえずこの辺のパターンを組み合わせれば頻出する処理は書けるかと思います。誤りや改善点など、何かありましたらぜひご指摘いただければ幸いです。
なお、実際には sync.WaitGroup を channel に変換しているところなどはメソッド化して使うのがオススメです。
個人的にはシンプルな for-range ではなく select-case の loop で色々やり始めたら黄色信号かなーと思っているのですが、使い所も確かにあるので難しい。
重めの処理を非同期 worker でゴリゴリやりたくなったらぜひ参考にしてみてください!