講完Go并發(fā)控制,講講并發(fā)抑制
已知有一個函數(shù)search,能夠按照關鍵詞執(zhí)行搜索,coSearch能夠批量并發(fā)查詢。
讓我們把目光定位到search上,search通過查詢數(shù)據(jù)庫或者調(diào)用其他api來完成搜索,這是一個相對耗時和消耗資源的操作。
當多個相同的關鍵詞并發(fā)查詢(調(diào)用search函數(shù))時,我們希望只產(chǎn)生一次數(shù)據(jù)庫調(diào)用(調(diào)用query),第一個查詢未完成時后續(xù)的重復查詢會等待,當?shù)谝粋€查詢完成時則會與其他查詢分享結(jié)果,這樣一來雖然只執(zhí)行了一次數(shù)據(jù)庫調(diào)用但是所有查詢都拿到了最終的結(jié)果。
圖片
什么是并發(fā)抑制:
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
func query(ctx context.Context, word string) (string, error) {
fmt.Println("searching: ", word)
time.Sleep(5 * time.Second)
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}
// 實現(xiàn)search,在重復并發(fā)調(diào)用下僅執(zhí)行一次query
// 其他并發(fā)共享這次query的結(jié)果
func search(ctx context.Context, word string) (string, error) {
return query(ctx, word)
}
func coSearch(ctx context.Context, words []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
results := make([]string, len(words))
for i, word := range words {
i, word := i, word
g.Go(func() error {
result, err := search(ctx, word)
if err != nil {
return err
}
results[i] = result
return nil
})
}
err := g.Wait()
return results, err
}
func main() {
words := []string{"Go","Go", "Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}好了,可以先暫停想想該如何實現(xiàn)search函數(shù)了。
一步一步實現(xiàn)并發(fā)抑制
我們先假設所有查詢關鍵詞都一樣,那么問題簡化成并發(fā)執(zhí)行search時,只在第一次search時調(diào)用query,其他的search并發(fā)調(diào)用等待并共享這次的查詢結(jié)果。
通過waiting變量,其他goroutine等待第一個goroutine數(shù)據(jù)庫調(diào)用完成,那么如何讓其他goroutine等待在這個位置呢?
func main() {
words := []string{"Go", "Go", "Go", "Go", "Go"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
var (
waiting bool
resp string
err error
)
func search(ctx context.Context, word string) (string, error) {
if waiting {
// 等待resp, err被賦值,即第一個query完成后再返回
// ...?
return resp, err
}
waiting = true
resp, err = query(ctx, word)
waiting = false
return resp, err
}
func query(ctx context.Context, word string) (string, error) {
fmt.Println("searching: ", word)
time.Sleep(5 * time.Second)
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}sync.WaitGroup{}并發(fā)控制
sync.WaitGroup{}是并發(fā)控制的核心,這里再次重申下用法:
- 當新運行一個goroutine時,我們需要調(diào)用wg.Add(1)。
- 當一個goroutine運行完成的時候,我們需要調(diào)用wg.Done()。
- wg.Wait()讓程序阻塞在此處,直到所有的goroutine運行完畢。
利用 sync.WaitGroup{}便可實現(xiàn)上文代碼中等待的效果:
var (
wg sync.WaitGroup
waiting bool
resp string
err error
)
func search(ctx context.Context, word string) (string, error) {
if waiting {
// 其他goroutine等待第一個goroutine執(zhí)行完成
wg.Wait()
return resp, err
}
waiting = true
wg.Add(1)
resp, err = query(ctx, word)
wg.Done() // 第一個goroutine執(zhí)行完成
waiting = false
return resp, err
}并發(fā)安全
當多個goroutine對同一個內(nèi)存區(qū)域進行讀寫時,就會產(chǎn)生并發(fā)安全的問題,它會導致程序運行的結(jié)果不符合預期,而上文的程序并發(fā)的讀寫了waiting變量,需要給waiting變量加把鎖。
釋放鎖的位置非常的有技巧,如果在在wg.Add(1)之前mu.Unlock(),可能 wg.Add(1)還未來得執(zhí)行其他goroutine已經(jīng)執(zhí)行了wg.Wait(),并獲取到了錯誤的數(shù)據(jù)。
unlock在add之前;
var (
wg sync.WaitGroup
mu sync.Mutex
waiting bool
resp string
err error
)
func search(ctx context.Context, word string) (string, error) {
mu.Lock()
if waiting {
mu.Unlock()
wg.Wait()
return resp, err
}
waiting = true
wg.Add(1)
// 在wg.Add(1)之后釋放鎖,保證其他goroutine被wg.Wait()阻塞
mu.Unlock()
resp, err = query(ctx, word)
wg.Done()
mu.Lock()
waiting = false
mu.Unlock()
return resp, err
}完整版本
現(xiàn)在可以針對不同的關鍵詞做區(qū)分了,使用一個map來代替原有的waiting,并將每一個關鍵詞查詢的WaitGroup和結(jié)果打包到map的value中。
type call struct {
wg sync.WaitGroup
resp string
err error
}
var (
mu sync.Mutex
m = make(map[string]*call)
)
func search(ctx context.Context, word string) (string, error) {
mu.Lock()
if c, ok := m[word]; ok {
mu.Unlock()
c.wg.Wait()
return c.resp, c.err
}
c := &call{}
m[word] = c
c.wg.Add(1)
// 在wg.Add(1)之后才釋放鎖,保證其他goroutine被wg.Wait()阻塞
mu.Unlock()
c.resp, c.err = query(ctx, word)
c.wg.Done()
mu.Lock()
delete(m, word)
mu.Unlock()
return c.resp, c.err
}開源庫 golang.org/x/sync/singleflight
上面一步一步教大家手搓了一個并發(fā)抑制的邏輯,我們的基本邏輯和開源庫golang.org/x/sync/singleflight沒有區(qū)別,只是singleflight內(nèi)部實現(xiàn)更加嚴謹
直接使用singleflight非常簡單的就可以實現(xiàn)我們的訴求
- singleflight.Group 創(chuàng)建一個需要并發(fā)控制的范圍
- Do函數(shù)
第一個參數(shù)接收一個key來判斷否重復調(diào)用
第二個參數(shù)為要執(zhí)行的函數(shù),函數(shù)可以返回正常值或者error
Do函數(shù)返回值除了閉包函數(shù)的返回值之外,還返回了此次返回值是否由其他goroutine共享
import (
"golang.org/x/sync/singleflight"
)
var g = new(singleflight.Group)
func search(ctx context.Context, word string) (string, error) {
resp, err, _ := g.Do(word, func() (interface{}, error) {
return query(ctx, word)
})
return resp.(string), err
}錯誤處理
因為共享第一個goroutine的結(jié)果,因此如果第一次調(diào)用失敗,那其他goroutine也都會失敗
如果在某些場景下允許第一個調(diào)用失敗后再次嘗試調(diào)用該函數(shù),那么可以通過調(diào)用Forget方法來忘記這個key
var g = new(singleflight.Group)
func search(ctx context.Context, word string) (string, error) {
resp, err, _ := g.Do(word, func() (interface{}, error) {
val, err := query(ctx, word)
// 當出錯并且允許重試時
if err != nil && true {
g.Forget(word)
return "", err
}
return val, err
})
return resp.(string), err
}超時控制
當使用Do函數(shù)時,如果query長時間未響應(這里假設qeury不具備超時能力),那么所有的goroutine都會被阻塞并等待,利用DoChan+select可以實現(xiàn)超時邏輯
var g = new(singleflight.Group)
func search(ctx context.Context, word string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
result := g.DoChan(word, func() (interface{}, error) {
return query(ctx, word)
})
select {
case r := <-result:
return r.Val.(string), r.Err
case <-ctx.Done():
return "", ctx.Err()
}
}使用場景
預防緩存穿透
在高并發(fā)的狀態(tài)下,一般會給熱點數(shù)據(jù)設置緩存。但數(shù)據(jù)第一次訪問或者緩存失效的狀態(tài)下,如果直接去查詢數(shù)據(jù)庫,會給數(shù)據(jù)庫造成極大壓力,甚至直接打爆數(shù)據(jù)庫。
以上各種分享中被反復提到的場景,但!注意!使用singleflight就一勞永逸了么,不是的,在大規(guī)模集群下可能有數(shù)百臺機器,當處在高并發(fā)狀態(tài)時,即使每臺機器只發(fā)起一個請求,也足以打爆你的數(shù)據(jù)庫!結(jié)合實際,搭配適當?shù)木彺娌呗浴?shù)據(jù)預熱、限流等手段才能避免潛在的風險。挖個坑,以后有機會聊聊這些問題
總結(jié)
本篇作為一個例子,給你講透典型的Go并發(fā)控制的姊妹篇,講述了另外一種并發(fā)控制模型,并介紹了開源庫golang.org/x/sync/singleflight。
當由一個goroutine并發(fā)向下發(fā)展成多個goroutine時,使用golang.org/x/sync/errgroup
當多個goroutine并發(fā)向下抑制成一個goroutine時,使用golang.org/x/sync/singleflight





























