Golang并發模型:何時該用Goroutine?何時該用Channel?
作者:磊豐
Goroutine最佳實踐總是考慮Goroutine的退出機制,使用sync.WaitGroup來等待Goroutine完成,避免創建無限制的Goroutine(考慮工作池模式),為Goroutine添加恢復機制(defer + recover)
引言
- 簡要介紹Golang的并發模型
- Goroutine和Channel的基本概念
- 為什么需要理解它們的使用時機
第一部分:Goroutine的使用場景
1. 執行獨立任務時
// 示例:并發執行多個獨立HTTP請求
func fetchURL(url string, wg *sync.WaitGroup){
defer wg.Done()
resp, err := http.Get(url)
if err !=nil{
fmt.Printf("Error fetching %s: %v\n", url, err)
return
}
fmt.Printf("Fetched %s, status: %s\n", url, resp.Status)
}
func main(){
urls :=[]string{"https://golang.org","https://google.com","https://github.com"}
var wg sync.WaitGroup
for_, url :=range urls {
wg.Add(1)
go fetchURL(url,&wg)
}
wg.Wait()
fmt.Println("All requests completed!")
}2. 需要并行計算時
// 示例:并行計算斐波那契數列
func calculateFibonacci(n int, result chan<-int){
if n <=1{
result <- n
return
}
a :=make(chanint)
b :=make(chanint)
go calculateFibonacci(n-1, a)
go calculateFibonacci(n-2, b)
result <-(<-a +<-b)
}
func main(){
result :=make(chanint)
go calculateFibonacci(10, result)
fmt.Printf("Fib(10) = %d\n",<-result)
}3. 處理I/O密集型操作時
// 示例:并發讀取多個文件
func readFile(filename string, results chan<-string){
data, err := os.ReadFile(filename)
if err !=nil{
results <- fmt.Sprintf("Error reading %s: %v", filename, err)
return
}
results <- fmt.Sprintf("Read %s: %d bytes", filename,len(data))
}
func main(){
files :=[]string{"file1.txt","file2.txt","file3.txt"}
results :=make(chanstring,len(files))
for_, file :=range files {
go readFile(file, results)
}
for range files {
fmt.Println(<-results)
}
}4. 實現后臺任務時
// 示例:后臺日志處理器
func logProcessor(logs <-chanstring){
for logEntry :=range logs {
// 模擬處理延遲
time.Sleep(100* time.Millisecond)
fmt.Printf("Processed log: %s\n", logEntry)
}
}
func main(){
logChan :=make(chanstring,100)
go logProcessor(logChan)
// 模擬生成日志
for i :=0; i <10; i++{
logChan <- fmt.Sprintf("Log entry %d", i)
}
close(logChan)
time.Sleep(1* time.Second)// 等待處理完成
}第二部分:Channel的使用場景
1. Goroutine間通信時
// 示例:生產者-消費者模型
func producer(items chan<-int){
for i :=0; i <5; i++{
items <- i
time.Sleep(time.Second)
}
close(items)
}
func consumer(id int, items <-chanint){
for item :=range items {
fmt.Printf("Consumer %d received: %d\n", id, item)
}
}
func main(){
items :=make(chanint)
go producer(items)
// 啟動多個消費者
for i :=1; i <=3; i++{
go consumer(i, items)
}
time.Sleep(6* time.Second)
}2. 同步Goroutine執行時
// 示例:使用Channel同步多個Goroutine
func worker(id int, ready <-chanstruct{}, done chan<-int){
<-ready // 等待開始信號
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Duration(rand.Intn(3))* time.Second)
fmt.Printf("Worker %d finished\n", id)
done <- id
}
func main(){
const numWorkers =5
ready :=make(chanstruct{})
done :=make(chanint, numWorkers)
// 啟動workers
for i :=1; i <= numWorkers; i++{
go worker(i, ready, done)
}
// 同時開始所有workers
close(ready)
// 等待所有workers完成
for i :=1; i <= numWorkers; i++{
<-done
}
fmt.Println("All workers completed")
}3. 實現管道模式時
// 示例:數據處理管道
func stage1(in <-chan int)<-chan int{
out :=make(chan int)
go func(){
for n :=range in {
out <- n *2
}
close(out)
}()
return out
}
func stage2(in <-chan int)<-chan int{
out :=make(chan int)
go func(){
for n :=range in {
out <- n +1
}
close(out)
}()
return out
}
func stage3(in <-chan int)<-chan string{
out :=make(chan string)
go func(){
for n :=range in {
out <- fmt.Sprintf("Result: %d", n)
}
close(out)
}()
return out
}
func main(){
// 創建輸入channel
in :=make(chanint)
// 構建管道
pipeline :=stage3(stage2(stage1(in)))
// 發送數據
go func(){
for i :=0; i <5; i++{
in <- i
}
close(in)
}()
// 接收結果
for result :=range pipeline {
fmt.Println(result)
}
}4. 限制并發數量時
// 示例:使用緩沖Channel實現工作池
func worker(id int, jobs <-chan int, results chan<-int){
for j :=range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished job %d\n", id, j)
results <- j *2
}
}
func main(){
const numJobs =10
const numWorkers =3
jobs :=make(chanint, numJobs)
results :=make(chanint, numJobs)
// 啟動workers
for w :=1; w <= numWorkers; w++{
go worker(w, jobs, results)
}
// 發送jobs
for j :=1; j <= numJobs; j++{
jobs <- j
}
close(jobs)
// 收集結果
for a :=1; a <= numJobs; a++{
<-results
}
}第三部分:Goroutine和Channel的聯合使用
1. 扇出模式
// 示例:一個生產者,多個消費者
func producer(nums chan<-int){
for i :=0; i <10; i++{
nums <- i
}
close(nums)
}
func consumer(id int, nums <-chan int, done chan<-bool){
for num :=range nums {
fmt.Printf("Consumer %d got %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(500))* time.Millisecond)
}
done <-true
}
func main(){
nums :=make(chanint)
done :=make(chanbool)
go producer(nums)
// 啟動多個消費者
for i :=0; i <3; i++{
go consumer(i, nums, done)
}
// 等待所有消費者完成
for i :=0; i <3; i++{
<-done
}
}2. 扇入模式
// 示例:多個生產者,一個消費者
func producer(id int, nums chan<-int){
for i :=0; i <3; i++{
nums <- id*10+ i
time.Sleep(time.Duration(rand.Intn(500))* time.Millisecond)
}
}
func consumer(nums <-chanint, done chan<-bool){
for num :=range nums {
fmt.Printf("Received: %d\n", num)
}
done <-true
}
func main(){
nums :=make(chanint)
done :=make(chanbool)
// 啟動多個生產者
for i :=0; i <3; i++{
go producer(i, nums)
}
// 啟動消費者
go func(){
time.Sleep(2* time.Second)
close(nums)
}()
go consumer(nums, done)
<-done
}3. 超時控制
// 示例:使用select實現超時控制
func longRunningTask(result chan<-string){
// 模擬長時間運行的任務
time.Sleep(3* time.Second)
result <-"Task completed"
}
func main(){
result :=make(chanstring)
go longRunningTask(result)
select{
case res :=<-result:
fmt.Println(res)
case<-time.After(2* time.Second):
fmt.Println("Task timed out")
}
}第四部分:最佳實踐與常見陷阱
Goroutine最佳實踐
- 總是考慮Goroutine的退出機制
- 使用sync.WaitGroup來等待Goroutine完成
- 避免創建無限制的Goroutine(考慮工作池模式)
- 為Goroutine添加恢復機制(defer + recover)
Channel最佳實踐
- 明確Channel的所有權(哪個Goroutine負責關閉)
- 使用緩沖Channel來解耦生產者和消費者
- 考慮使用context.Context來取消Channel操作
- 小心nil Channel和已關閉Channel的操作
常見陷阱
- Goroutine泄漏
- Channel死鎖
- 共享內存競爭條件
- 不正確的Channel關閉
結論
- 總結Goroutine和Channel的核心使用場景
- 強調根據具體需求選擇合適的并發模式
- 推薦進一步學習資源(官方文檔、經典書籍等)
責任編輯:武曉燕
來源:
Go語言圈
























