Go の channel 処理パターン集

この記事は Go Advent Calendar 2017 の1日目の記事です。

Go の長所に goroutine による非同期処理がありますが、どうしても channel の取り回しで黒魔術化しがちです。少しでも闇を減らしていきたいので、 channel らへんの取り回しについてパターンをまとめました。チートシート的に使えれば嬉しいです。

なお、サムネイルは行きつけの猫カフェの子猫たちです。かわいい。

Go の channel の基礎

入門資料として使いたいので、本題に入る前にざっくり基礎を。

定義のパターン

channel には capacity という概念があります。 capacity は channel 内でバッファリングしておける容量のことで、 capacity に空きが無い場合は送信側が処理待ち(ブロック)します。

capacity なしの定義(int型で capacity 0)

ch := make(chan int)

capacity ありの定義(int型で capacity 10)

ch := make(chan int, 10)

close

channel は close によって閉じることができます。受信側で検知できるので、受信側に終了を伝えるために使います。

close(ch)

送信のパターン

channel で定義した型の値を投入することができます。前述の通り、 capacity に余裕がなければ受信されるまでブロックします。

ch <- 1

受信のパターン

受信の書き方は6パターンあります。

受信、もしくは close したことだけ検知するパターン。非同期処理の終了を待つ場合に使うことが多いです。

<-ch

値を取得するパターン。 close された場合はゼロ値が入ります

v := <-ch

値が入ったのか close されたのかを判別するパターン。 ok が false なら close です。

v, ok := <-ch
if !ok {
    // closed
}

select-case で待ち受けるパターン。順序は保証されませんが複数 channel を同時に待ち受けることができます。

select {
case v := <-ch1:
    // ch 1
case v := <-ch2:
    // ch 2
}

default をつけると既に値が入っている channel がなければ待たずに default が実行されます。

select {
case v := <-ch1:
    // ch 1
case v := <-ch2:
    // ch 2
default:
    // default
}

for-range で処理することもできます。 close されたらループを抜けます。

for v := range ch {
    // do something
}
// closed

使うときの主な注意点

踏みがちな注意点です。これらを回避するために黒魔術が生まれがちです。

  • close した channel に event を送ると panic する
  • 多重に close しても panic する
  • 受信が先にいなくなると block し続ける

close 済みな channel を操作すると無視するのではなくしっかり panic するのが Go らしいところです。曖昧さの排除。

よくあるアンチパターン

実用的なパターン集に入る前に、見かけがちなアンチパターンを紹介しておきます。ブロックし続ける可能性があったり、 panic する可能性があったりします。

終了を受信側に伝えるのに終了 event を送る

シンプルな処理なら上手く動かせると思いますが、受信側が終了用 channel 以外の何かしらで待ちを抜けた場合に迷子になります。

func A(done chan struct{}) {
    // do something
    done <- struct{}{}
}

func B() {
    done := make(chan struct{})
    go A(done)

    // do something
    // この辺で return や panic したら A の goroutine が受け取り側がなく待ち続ける

    <-done
}

close させるのが良いですね。

func A(done chan struct{}) {
    // do something
    close(done)
}

func B() {
    done := make(chan struct{})
    go A(done)

    // do something

    <-done
}

たまに select-case でこういうコードを見かけたりもしますが、前述の通りブロックし続ける可能性があります。

func worker(jobChan chan Job, done chan struct{}) {
    for {
        select {
        case j := <-jobChan:
            // do something
        case <-done:
            return
        }
    }
}

func B(jobs []Job) {
    jobChan := make(chan Job)
    done := make(chan struct{})
    go worker(jobChan, done)
    
    for _, j := range jobs {
        jobChan <- j
    }

    done <- struct{}{}
}

上記ようにシンプルな処理なら、出来る限り for-range を使っていくのがおすすめです。 close されると抜けてくれます。サンプルは以下。

func worker(jobChan chan Job) {
    for j := range jobChan {
        // do something
    }
}

func B(jobs []Job) {
    jobChan := make(chan Job)
    go worker(jobChan)
    
    for _, j := range jobs {
        jobChan <- j
    }

    close(jobChan)
}

close される可能性のある channel を第一戻り値だけで受ける

select で待ち受ける場合にうっかりやりがちです。 close されると第一戻り値はゼロ値になるので、想定外の挙動になる可能性があります。

func A() {
    select {
    case v := <-valueORCloseChan:
        // do something with v
        // close された場合は vがゼロ値
    case v := <-otherChan:
        // do something
    }
}

ok で判定して分岐させます。

func A() {
    select {
    case v, ok := <-valueORCloseChan:
        if !ok {
            // closed
        }
        // do something with v
    case v := <-otherChan:
        // do something
    }
}

goroutine から複数 event が来る可能性があるのに始めのものを受けたら抜ける

error 用の channel を待つ場合にありがちです。始めのもの以外は迷子になります。特に capacity が 0 の channel の goroutine はずっとブロックし続けます。

func A(wg *sync.WaitGroup, errChan chan error) {
    defer wg.Done()
    if err := doSomething(); err != nil {
        // ここが複数回発火すると詰まる
        errChan <- err
    }
}


func B() error {
    wg := new(sync.WaitGroup)
    errChan := make(chan error)
    done := make(chan struct{})

    wg.Add(2)
    go A(wg, errChan)
    go A(wg, errChan)

    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return nil 
    case err := <-errchan:
        return err
    }
}

受け取るのは一つでいいのであれば、イベント送信側を sync.Once で 1 event しか送られないように統一するのが便利です。もし sync.Once を渡していくのが複雑になる場合は、そもそも設計が複雑すぎる方を疑いたい。

var once sync.Once

func A(wg *sync.WaitGroup, errChan chan error) {
    defer wg.Done()
    if err := doSomething(); err != nil {
        once.Do(func() {
            errChan <- err
        })
    }
}


func B() error {
    wg := new(sync.WaitGroup)
    errChan := make(chan error)
    done := make(chan struct{})

    wg.Add(2)
    go A(wg, errChan)
    go A(wg, errChan)

    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return nil 
    case err := <-errchan:
        return err
    }
}

select の発火順は無保証なのでちょっと怪しいコードです。真面目にやるなら done ではなく errChan を close ですね。

受け取るのが error であれば errgroup がオススメです。 sync.WaitGroup のような使用感で、 context を使って一つ失敗したら他の処理を cancel する機構付きなので非常に便利。

イベント送信時に channel が closed か調べたい

複数の送信側処理があり、受信は一つで良い場合に欲しくなるやつ。たまに欲しくなります。

が、今のところ便利なインターフェースは提供されていません。

受信してみて第二戻り値が false なら closed ですが、受信すると第一戻り値を普通に受け取っちゃう場合もあるのでよろしくない。

一応こんな感じで実装してみましたが、アンチパターン臭がすごい。

func isClosed(ch chan Value) bool {
    select {
    case v, ok := <-ch:
        if !ok {
            return true
        }
        go func() {
            ch <- v
        }()
    default:
    }
    return false
}

func A(ch chan Value) {
    if !isClosed(ch) {
        ch <- SomeValue()
    }
}

上記のコードは lock を仕込んでおらず、チェック後に close されている可能性が残っているので実用的ではないです。 lock 処理まで入れるとがんじがらめな黒魔術になるので別の手段を取ったほうが良さそう。

高速に送りまくる

例えばシンプルな slice 処理に比べるとパフォーマンスがどうしても悪くなるので、高パフォーマンスが必要な箇所ではベンチを取って検証した方がいいです。

ちょっと雑ですが、シンプルに slice で入れ出しする場合と channel を経由して入れ出しする場合のベンチを取ってみました。

func BenchmarkWithSimpleSlice(b *testing.B) {
	src := make([]int, 10000)
	dst := make([]int, 10000)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		// write
		for i := range src {
			dst[i] = src[i]
		}
		// read
		for i := 0; i < len(src); i++ {
			_ = dst[i]
		}
	}
}

func BenchmarkWithChannel(b *testing.B) {
	src := make([]int, 10000)
	ch := make(chan int, 10000)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		// write
		for i := range src {
			ch <- src[i]
		}
		// read
		for i := 0; i < len(src); i++ {
			<-ch
		}
	}
}

結果が以下。

 BenchmarkWithSimpleSlice       2000000              8991 ns/op
 BenchmarkWithChannel             30000            580989 ns/op

570,000 ns/op くらいの差が出ているので、 57 ns/item くらいでしょうか。ベンチマークとしての妥当性はともかく、高パフォーマンスが必要なところではある程度の考慮が必要になりそうなことは伝わるかと思います。

channel で並列数管理する場合のパターン

ここから実用的なパターン集です。

並列数10だとこんな感じ。

func A(jobs []Job) {
    sem := make(chan struct{}, 10)
    for _, j := range jobs {
        sem <- struct{}{}
        j.Run()
        <-sem
    }
}

capacity の仕組みを使ってブロックさせることで並列数管理をしています。

worker 的な用途で使う場合、 queue 取得のタイミングに注意。 channel に入れる前に queue を取得すると持ったままブロックします。考慮するなら以下のような感じでしょうか。

func A(jobs []Job) {
    sem := make(chan struct{}, 10)
    for {
        sem <- struct{}{}
        j, ok := <-jobs
        if !ok {
            break
        }
        j.Run()
        <-sem
    }
}

重み付けなどが必要になったら semaphore が便利です。

channel で処理の終了を待つパターン

シンプルに待つだけ。 close して使うのがおすすめです。ちなみに、値が要らない場合は struct{} がサイズ 0 のデータなのでおすすめです。

func A(done chan struct{}) {
    defer close(done)
    // do something
}

func B() {
    done := make(chan struct{})
    go A(done)

    // wait
    <-done
}

channel の closed を検知するパターン

アンチパターンと言ったばかりですが、 close するだけの channel なら使い所があります。

特に、メソッド内で context の状態を確認するのにたまに使います。 cancel されているか確認するのに Done() で返ってくる chan が closed か判定したい。 select-case の default を使います。

var isDone bool
select {
    case <-ctx.Done():
        isDone = true
    default:
        isDone = false
}

こんな感じ。自分はメソッド化して使ったりします。

func isDone(ctx context.Context) bool {
select {
    case <-ctx.Done():
        return true
    default:
        return false
}

たまに Done 検知のサンプルで default がないものがありますが、 select 文では default がないとどれかの channel からイベントが返るまでブロックするので注意です。

channel で繰り返し処理する場合のパターン集

基本的には 送信側を先に片付ける -> channel を close する -> 受信側が待ちを抜ける の順番で処理するように意識するのが大事です。 select 待ち受けも使い所はありますが、なるべく for-range と defer を使っていく方がおすすめです。

送信側は1つ and 受信側は1つ

並列化するほどじゃないけど受け取り処理はさっさと済ませて遅延処理させたいシンプルなパターン。 channel の capacity が溢れない程度に受信側が上手いこと処理してくれている(間に合っている)場合はコレで足りる。

func worker(queue chan Job) {
    for j := range queue {
        // do something
    }
}

func A() error {
    queue := make(chan Job, 100)
    defer close(queue)

    go worker(queue)

    for {
        j, err := someHandle()
        if err != nil {
            return err
        }
        queue <- j
    }
    return nil
}

間に合わなくなってきたら次述のように受信側を増やしていきます。

送信側は1つ and 受信側は複数

受け取り側の処理が1つじゃ追いつかない場合によく書く worker スタイル。ただの groutine 起動との違いは並列数の管理です。

送信側が1つなので、受信側が複数に増えても channel を close してあげれば問題なし。

func worker(queue chan Job) {
    for j := range queue {
        // do something
    }
}

const workerNum = 10

func A() error {
    queue := make(chan Job, 100)
    defer close(queue)

    for i := 0; i < workerNum; i++ {
        go worker(queue)
    }

    for {
        j, err := someHandle()
        if err != nil {
            return err
        }
        queue <- j
    }
    return nil
}

送信側は複数 and 受信側は1つ

並列処理の結果を収集したい場合によく書く。キャンセルにしろ正常終了にしろ、全ての送信処理が終わったことを確認してから受信側を抜ける必要がある。

送信処理の goroutine があといくつ残っているかを数えてあげて、全て片付いたら close してあげれば OK 。atomic 系で数えてあげてもいいけど、 sync.WaitGroup が好みです。

とあるページング的リクエストを分割して並列実行し、結果を配列に集めるパターン。

func someRequest(from, to uint, resultChan chan Result) {
    results := request(from, to)
    for _, r := range results {
        resultChan <- r
    }
}

func A() []Result {
    resultChan := make(chan Result, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 99; i++ {
        wg.Add(1)
        from := i * 100
        to := (i + 1) * 100
        go func() {
            defer wg.Done()
            someRequest(from, to, resultChan)
        }()
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    results := make([]Result, 0, 10000)
    for _, r := range resultChan {
        results = append(results, r)
    }

    return results
}

各送信側にキャンセルを伝えたい場合は専用の channel を渡してあげてもいいけど、 context を渡すのが楽。外部コマンドや外部リクエストにもそのまま渡せるので。

func someRequest(ctx context.context, from, to uint, resultChan chan Result) {
    results := request(ctx, from, to)
    for _, r := range results {
        resultChan <- r
    }
}

func A(ctx context.Context) []Result {
    resultChan := make(chan Result, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 99; i++ {
        wg.Add(1)
        from := i * 100
        to := (i + 1) * 100
        go func() {
            defer wg.Done()
            someRequest(ctx, from, to, resultChan)
        }()
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    results := make([]Result, 0, 10000)
    for _, r := range resultChan {
        results = append(results, r)
    }

    return results
}

送信側は複数 and 受信側は複数

Dispatcher - Worker 的な。 Dispatcher を独自に用意しているサンプルが散見されますが、型によって受け渡す worker を変えて云々、みたいな分岐が必要なければ channel 自体が dispatcher になります。

基本的には受信側が増える分には close を使えば問題なし。

func worker(queue chan Job) {
    for j := range queue {
        // do something
    }
}

func someRequest(from, to uint, callbackJobQueue chan Job) {
    results := request(from, to)
    for _, r := range results {
        callbackQueue <- NewJob(r)
    }
}

const (
    workerNum = 10
)

func A() {
    queue := make(chan Job, 100)
    defer close(queue)
    wg := new(sync.WaitGroup)

    for i := 0; i < workerNum; i++ {
        go worker(queue)
    }
    for i := 0; i < 99; i++ {
        wg.Add(1)
        from := i * 100
        to := (i + 1) * 100
        go func() {
            defer wg.Done()
            someRequest(from, to, queue)
        }()
    }

    wg.Wait()
}

結果を収集したければ worker に収集用の channel を渡せば OK 。

select 待ち受けについて

基本的には for-range をおすすめします。 select を使うのは以下のような場合かなーと。

  • ループではなく一回だけ発火するやーつ
    • 全て close 待ち
      • どれかが close されたら処理を抜ける場合。とはいえ発火順序は保証されないので使い所があまり思いつかなかった
    • 待ち受ける複数 channel のうち、一つしか返ってこないことが保証される場合
      • 非同期処理で resolve か reject のどちらかだけ返ってくる場合など。
    • 一個受けたらもう抜けちゃっていいやつ
      • 例えば signal 処理。プロセスを終了するなら一個だけ受ければいい
  • 抜ける気がないループ
    • プロセスが死ぬまで動かし続けるので色々気にしないでいい場合
  • ちゃんと送信側が片付いたことを全て確認できている
    • 前述の送信側が複数あるパターンと同じく、 sync.WaitGroup などでちゃんと管理できている場合。ただし、 for-range と違い全ての channel が受信済みであることも保証しないと受信漏れが生じるのでややこしい。
    • それぞれで for-range を組むほうがシンプルになりがち(個人の見解)

受信漏れの考慮に苦心しがちなので、 channel でループする場合は for-range に寄せていったほうが楽かなーと思っています。

timeout 処理や context.Done のハンドリングは悩ましいところですが、なるべく context の仕組みに頼り、どうしても必要なところは受信漏れを考慮した上で select 待ち受けするのがいいかと。 capacity なしの結果用 channel を返す上、 timeout 処理がちゃんと実装されていないメソッドがあったとしても、 context 対応でシンプルに使える wrapper を用意したいところ。

ちなみにありがちなミスとして、 select in for loop で close された channel があると暴走します。

func main() {
    ch1 := make(chan struct{})
    ch2 := make(chan struct{})
    close(ch1)

    for {
        select {
        case <- ch1:
            log.Println("1")
        case <- ch2:
            log.Println("2")
        }
    }
}

上記のように select-case で closed な channel があるとひたすら発火して 1 が出力され続けます。多段の event 待ち受けで受信側から送信側に送信停止を伝播させたい場合にやりがち。

func A(ch1 chan Event, stopA chan struct{}) {
    defer close(ch1)

    t := time.NewTicker(3 * time.Second)
    defer t.Stop()

    for {
        select {
        case <-t.C:
            ch1 <- handle()
        case <-stopA:
            return
        }
    }
}

func B(ch1 chan Event, ch2 chan Job, stopB chan struct{}, stopA chan struct{}) {
    defer close(ch2)
    for {
        select {
        case e, ok := <-ch1:
            if !ok {
                return
            }
            ch2 <- NewJob(e)
        case <-stopB:
            close(stopA)
        }
    }
}

func C(ch2 chan Job, stopB chan struct{}) {
    for j := range ch2 {
        if err := j.Run(); err != nil {
            close(stopB)
        }
    }
}

上記のような Event を受けて Job を dispatch するコードだと、 B の stopB の case が連続発火して stopA を多重 close し、 panic します。

対処としては channel ごとに待ち受ける goroutine を分けて処理したり、 nil で上書きして case 判定から除外したりでしょうか? 個人的には設計が怪しくなっていると判断してリファクタしたくなります。

エラー収集(一つだけ拾えば OK )

errgroup が楽すぎるので使っていきましょう

エラー収集(複数集める)

for-range で各処理から error を集め、処理が全て終わったら close します。

func A() []error {
    errChan := make(chan error, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if err := doSomething(); err != nil {
                errChan <- err
            }
        }()
    }

    go func() {
        wg.Wait()
        close(errChan)
    }()

    errors := make([]error, 0, 100)
    for r := range errChan {
        errors = append(errors, r)
    }

    return errors
}

エラー収集(結果も集める)

結果と error をまとめた struct を用意すれば error のみと同じように書けます。

type Wrapper struct {
    Result Result
    Error  error
}

func A() []Wrapper {
    resultChan := make(chan Wrapper, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            result, err := doSomething();
            resultChan <- Wrapper{
                Result: result,
                Error:  err,
            }
        }()
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    results := make([]Wrapper, 0, 100)
    for r := range resultChan {
        results = append(results, r)
    }

    return results
}

channel を増やして並べる書き方もできます。ただし、 capacity に注意しないとブロックします(以下の例だと errChan を先に処理しているので、 resultChan が飽和したら詰まる)。

func A() ([]Result, []error) {
    errChan := make(chan error, 100)
    resultChan := make(chan Result, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            result, err := doSomething();
            if err != nil {
                errChan <- err
                return
            }
            resultChan <- result
        }()
    }

    go func() {
        wg.Wait()
        close(errChan)
        close(resultChan)
    }()

    errors := make([]error, 0, 100)
    for r := range errChan {
        errors = append(errors, r)
    }
    results := make([]Result, 0, 100)
    for r := range resultChan {
        results = append(results, r)
    }

    return results, errors
}

前者のほうが安全ですが、個人的には後続処理で結果と error を分離するなら capacity に気をつけながら後者でもいいかなーと思っています。

以上

とりあえずこの辺のパターンを組み合わせれば頻出する処理は書けるかと思います。誤りや改善点など、何かありましたらぜひご指摘いただければ幸いです。

なお、実際には sync.WaitGroup を channel に変換しているところなどはメソッド化して使うのがオススメです。

個人的にはシンプルな for-range ではなく select-case の loop で色々やり始めたら黄色信号かなーと思っているのですが、使い所も確かにあるので難しい。

重めの処理を非同期 worker でゴリゴリやりたくなったらぜひ参考にしてみてください!

Share Comments
comments powered by Disqus