精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Go netpoll 的實現和使用,你學會了嗎?

開發 前端
Go 目前使用的 IO 模型是 IO 多路復用,由 netpoll 模塊實現,這部分沒有特別的地方,重點是 Go 把 IO 模型和協程結合一起,大概的流程如下。

IO 模型是軟件中非常重要的部分,軟件架構也隨著 IO 模型的變化而變化,比如服務器架構經歷了一個請求一個進程+阻塞式 IO,一個請求一個線程+阻塞式 IO,IO 多路復用+非阻塞 IO,異步 IO。現代軟件中,大多數軟件使用的 IO 模型是 IO 多路復用,因為它的平臺兼容性比較好一點,但是慢慢也有不少軟件支持了異步 IO,比如 Node.js 已經支持了 io_uring。Go 目前使用的 IO 模型是 IO 多路復用,由 netpoll 模塊實現,這部分沒有特別的地方,重點是 Go 把 IO 模型和協程結合一起,大概的流程如下。

  1. 當一個協程讀且不滿足條件時,Go 會把協程記錄到 pollDesc 中,接著把它改成等待狀態并觸發重新調度。
  2. Go 會定時或按需調用 netpoll 獲取就緒的事件。
  3. 通過 netpoll 返回的事件信息找到對應的 pollDesc,并根據 pollDesc 找到對應的協程,把協程改成就緒狀態等待調度執行。

本文介紹 netpoll 的實現以及它是如何和協程結合起來的。

netpool

核心數據結構

了解 netpoll 之前,首先需要先了解 netpoll 模塊的核心數據結構 pollDesc。

type pollDesc struct {
 fd    uintptr        // constant for pollDesc usage lifetime
 fdseq atomic.Uintptr // protects against stale pollDesc
  // 記錄一些信息,比如是否有錯誤,是否超時等
 atomicInfo atomic.Uint32 // atomic pollInfo

  /*
  核心字段
    Nil:初始化狀態
    pdReady:事件就緒,
    pdWait:準備進入阻塞狀態
    其他:協程結構體的地址
  */
 rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
 wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil

 lock    mutex // protects the following fields
  // 只在某些系統使用,如 aix,記錄 pollDesc 在數組中的索引
 user    uint32    // user settable cookie
 closing bool
  // 超時管理
 rrun    bool      // whether rt is running
 wrun    bool      // whether wt is running
 rseq    uintptr   // protects from stale read timers
 rt      timer     // read deadline timer
 rd      int64     // read deadline (a nanotime in the future, -1 when expired)
 wseq    uintptr   // protects from stale write timers
 wt      timer     // write deadline timer
 wd      int64     // write deadline (a nanotime in the future, -1 when expired)
 self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

一個 fd 對應一個 pollDesc,pollDesc 負責管理 fd 的讀寫事件、等待事件的超時時間以及記錄阻塞等待 fd 事件的協程。netpoll 根據 pollDesc 的信息,通過操作系統的 IO 多路復用模塊實現對事件的感知,比如 Linux 的 epoll 和 MacOS 的 kqueue 的 IO 多路復用模塊。

IO 多路復用

IO 多路復用模塊需要實現下面幾個接口。

// 初始化,比如創建一個 epoll 實例,后面通過該實例監聽 fd 的事件
func netpollinit() {
}

// 判斷 fd 是不是 netpoll 內部的 fd,IO 多路復用實例的 fd 或內部創建的 pipe fd
func netpollIsPollDescriptor(fd uintptr) bool {
 return false
}


// 注冊 fd 事件
func netpollopen(fd uintptr, pd *pollDesc) int32 {
 return 0
}

// 取消 fd 的事件
func netpollclose(fd uintptr) int32 {
 return 0
}

// 特定平臺才實現,更新 fd 事件,喚醒 IO 多路復用模塊注冊 fd 事件
func netpollarm(pd *pollDesc, mode int) {
}

// 喚醒 IO 多路復用模塊
func netpollBreak() {
}

/*
  獲取就緒的 fd
  delay = 0: 非阻塞
  delay < 0: 阻塞直到有就緒 fd,阻塞期間可以通過 netpollBreak 喚醒
  delay > 0: 帶超時的阻塞
*/
func netpoll(delay int64) (gList, int32) {
 return gList{}, 0
}

接下來看一下 MacOS 系統下 kqueue 的實現。

package runtime

var (
 kq             int32         = -1
 netpollWakeSig atomic.Uint32 // 是否已經發送了喚醒 kqueue
)

func netpollinit() {
  // 創建 IO 多路復用模塊的實例
 kq = kqueue()
  // 設置 _FD_CLOEXEC 標記,fork + execve 時自動關閉該 fd,避免子進程繼承
 closeonexec(kq)
  // 注冊 EVFILT_USER,后續可以手動喚醒 IO 多路復用模塊
 ev := keventt{
  ident:  kqIdent,
  filter: _EVFILT_USER,
  flags:  _EV_ADD,
 }
 kevent(kq, &ev, 1, nil, 0, nil)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
 var ev [2]keventt
  // 記錄 fd
 *(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
  // 注冊讀寫事件,工作方式是邊緣觸發
 ev[0].filter = _EVFILT_READ
 ev[0].flags = _EV_ADD | _EV_CLEAR
 ev[0].fflags = 0
 ev[0].data = 0
  // 記錄對應的 pollDesc,就緒處理時使用
  ev[0].udata = (*byte)(unsafe.Pointer(pd))
  // 復制結構體
 ev[1] = ev[0]
 ev[1].filter = _EVFILT_WRITE
 kevent(kq, &ev[0], 2, nil, 0, nil)
}

func netpollclose(fd uintptr) int32 {
 // Don't need to unregister because calling close()
 // on fd will remove any kevents that reference the descriptor.
 return 0
}

func netpollarm(pd *pollDesc, mode int) {
 throw("runtime: unused")
}

func netpollBreak() {
 // Failing to cas indicates there is an in-flight wakeup, so we're done here.
 if !netpollWakeSig.CompareAndSwap(0, 1) {
  return
 }
  // 喚醒 IO 多路復用模塊
 ev := keventt{
  ident:  kqIdent,
  filter: _EVFILT_USER,
  flags:  _EV_ENABLE,
  fflags: _NOTE_TRIGGER,
 }
 kevent(kq, &ev, 1, nil, 0, nil)
}

// delay < 0: 一直阻塞,直到有就緒事件或被主動喚醒
// delay == 0: 非阻塞
// delay > 0: 帶超時的阻塞
// 返回就緒的協程和數量
func netpoll(delay int64) (gList, int32) {
 var tp *timespec
 var ts timespec
  
 if delay < 0 {
  tp = nil
 } else if delay == 0 {
  tp = &ts
 } else {
  ts.setNsec(delay)
  if ts.tv_sec > 1e6 {
   // Darwin returns EINVAL if the sleep time is too long.
   ts.tv_sec = 1e6
  }
  tp = &ts
 }
 var events [64]keventt
retry:
  // 獲取就緒事件
 n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
 var toRun gList
 delta := int32(0)
 for i := 0; i < int(n); i++ {
  ev := &events[i]
    // 是否是手動喚醒事件
  if isWakeup(ev) {
      // 是阻塞式調用時才處理
   if delay != 0 {
    // netpollBreak could be picked up by a nonblocking poll.
    // Only call drainWakeupEvent and reset the netpollWakeSig if blocking.
    ev := keventt{
          ident:  kqIdent,
          filter: _EVFILT_USER,
          flags:  _EV_DISABLE,
        }
        // 禁用,下次手動觸發時再開啟
        kevent(kq, &ev, 1, nil, 0, nil)
        // 恢復標記
    netpollWakeSig.Store(0)
   }
   continue
  }

  var mode int32
  switch ev.filter {
  case _EVFILT_READ:
   mode += 'r'
  case _EVFILT_WRITE:
   mode += 'w'
  }
  if mode != 0 {
   var pd *pollDesc
      // 獲取 pollDesc
   pd = (*pollDesc)(unsafe.Pointer(ev.udata))
      // 記錄錯誤
   pd.setEventErr(ev.flags == _EV_ERROR, tag)
      // 修改協程為就緒狀態,返回喚醒的協程數
   delta += netpollready(&toRun, pd, mode)
  }
 }
  // 返回就緒的協程和數量
 return toRun, delta
}

IO 多路復用的工作方式有兩種,Go 使用的是邊緣觸發。

  1. 邊緣觸發:邊緣觸發是從無到有數據時才會通知,需要通過非阻塞方式(避免讀到沒數據時阻塞)不斷讀取數據,否則不會收到新通知。
  2. 水平觸發:水平觸發就是有數據時會一直通知用戶,用戶可以選擇什么時候讀,讀取時依然需要使用非阻塞的方式。比如驚群場景下,一個連接的到來導致多個進程被喚醒收到可讀事件,但是先被調度的進程會消費這個連接,導致其他進程讀取時已經沒有連接了,如果以阻塞方式調用會引起進程阻塞。

netpoll 模塊除了提供常用的 fd 事件訂閱發布外,還有一個重要的能力就是可喚醒,比如當前定時器最早超時時間是 5s,然后 IO 多路復用模塊阻塞等待 5s,過去 1s 時突然新增了一個超時時間為 1s 的定時器,這時候需要提前喚醒 IO 多路復用模塊,從而及時處理定時器。在某些系統的中可以通過特殊的事件直接喚醒,比如上面的 EVFILT_USER 類型的事件,而比較常用實現方式是通過 pipe 創建兩個 fd,分別為讀 fd 和 寫 fd,把讀 fd 注冊到 IO 多路復用模塊中,然后通過往寫 fd 寫入數據來喚醒 IO 多路復用模塊。

// 創建兩個 fd,一個注冊到 kqueue
func addWakeupEvent(kq int32) {
 r, w, errno := nonblockingPipe()
 ev := keventt{
  filter: _EVFILT_READ,
  flags:  _EV_ADD,
 }
 *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
 n := kevent(kq, &ev, 1, nil, 0, nil)
 netpollBreakRd = uintptr(r)
 netpollBreakWr = uintptr(w)
}

// 通過往 fd 寫喚醒 kqueue
func wakeNetpoll(_ int32) {
 for {
  var b byte
  n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
  if n == 1 || n == -_EAGAIN {
   break
  }
  if n == -_EINTR {
   continue
  }
 }
}

// 判斷是不是內部 fd,即上面創建的 fd
func isWakeup(ev *keventt) bool {
 if uintptr(ev.ident) == netpollBreakRd {
  if ev.filter == _EVFILT_READ {
   return true
  }
  println("runtime: netpoll: break fd ready for", ev.filter)
  throw("runtime: netpoll: break fd ready for something unexpected")
 }
 return false
}

// 消費喚醒時寫入的數據,一個喚醒結束
func drainWakeupEvent(_ int32) {
 var buf [16]byte
 read(int32(netpollBreakRd), noescape(unsafe.Pointer(&buf[0])), int32(len(buf)))
}

超時管理

netpoll 支持等待事件就緒的超時時間,其原理是設置一個定時器,然后超時時把協程改成就緒狀態等待調度執行。

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
 lock(&pd.lock)
  // 舊值
 rd0, wd0 := pd.rd, pd.wd
  // 讀寫超時
 combo0 := rd0 > 0 && rd0 == wd0
 if d > 0 {
    // 絕對超時時間
  d += nanotime()
 }
  // 記錄設置了讀或寫超時
 if mode == 'r' || mode == 'r'+'w' {
  pd.rd = d
 }
 if mode == 'w' || mode == 'r'+'w' {
  pd.wd = d
 }
 pd.publishInfo()
  // 設置了超時并且讀寫一樣
 combo := pd.rd > 0 && pd.rd == pd.wd
  // 設置定時器回調,處理讀寫超時
 rtf := netpollReadDeadline
 if combo {
  rtf = netpollDeadline
 }
  // 第一次設置
 if !pd.rrun {
    // 大于 0,修改定時器時間為 rd,處理函數為 rtf
  if pd.rd > 0 {
   pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
   pd.rrun = true
  }
 } else if pd.rd != rd0 || combo != combo0 { // 新舊的超時時間不一樣
  pd.rseq++ // invalidate current timers
    // 修改
  if pd.rd > 0 {
   pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
  } else {
      // 停止定時器
   pd.rt.stop()
   pd.rrun = false
  }
 }
  // 寫也差不多,忽略
  
  // 超時時間小于 0 才會執行下面的邏輯
 delta := int32(0)
 var rg, wg *g
  // 新的超時時間小于 0,則修改 pollDesc 狀態為 Nil 并喚醒阻塞的協程
 if pd.rd < 0 {
  rg = netpollunblock(pd, 'r', false, &delta)
 }
 if pd.wd < 0 {
  wg = netpollunblock(pd, 'w', false, &delta)
 }
 unlock(&pd.lock)
  // 把協程改成就緒狀態,等待調度
 if rg != nil {
  netpollgoready(rg, 3)
 }
 if wg != nil {
  netpollgoready(wg, 3)
 }
  // 等待者數量減去 delta
 netpollAdjustWaiters(delta)
}

func netpollgoready(gp *g, traceskip int) {
 goready(gp, traceskip+1)
}

func goready(gp *g, traceskip int) {
 systemstack(func() {
  ready(gp, traceskip, true)
 })
}

func ready(gp *g, traceskip int, next bool) {
 status := readgstatus(gp)
 mp := acquirem() 
  // 改成就緒狀態
 casgstatus(gp, _Gwaiting, _Grunnable)
  // 加入 p 的隊列等待調度執行
 runqput(mp.p.ptr(), gp, next)
 wakep()
 releasem(mp)
}

poll_runtime_pollSetDeadline 用于設置一個協程超時等待某個 fd 事件,支持取消超時設置。當超時時執行 netpolldeadlineimpl 把協程改成就緒狀態等待調度執行。

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
 lock(&pd.lock)
 delta := int32(0)
 var rg *g
  // 修改 pollDesc 的狀態
 if read {
  pd.rd = -1
  pd.publishInfo()
  rg = netpollunblock(pd, 'r', false, &delta)
 }
 // write 寫同上
 unlock(&pd.lock)
  // 把協程改成就緒狀態
 if rg != nil {
  netpollgoready(rg, 0)
 }
 netpollAdjustWaiters(delta)
}

netpool 在 Go 中的使用

IO 多路復用模塊只是提供了一些基礎的能力,那么這些能力是如何和 Go 結合起來的呢?下面從啟動一個 TCP 服務器為例,看看 netpoll 模塊是如何和 Go 結合起來的。

func Listen(network, address string) (Listener, error) {
 var lc ListenConfig
 return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
 l, err = sl.listenTCP(ctx, la)
 return l, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
 return sl.listenTCPProto(ctx, laddr, 0)
}

func (sl *sysListener) listenTCPProto(ctx context.Context, laddr *TCPAddr, proto int) (*TCPListener, error) {
 fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, proto, "listen", ctrlCtxFn)
 return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
 family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
 return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
}

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
 s, err := sysSocket(family, sotype, proto)
 fd, err = newFD(s, family, sotype, net)
  fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn)
 return fd, nil
}

func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
  return os.NewSyscallError("bind", err)
 }
 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
  return os.NewSyscallError("listen", err)
 }
 if err = fd.init(); err != nil {
  return err
 }
 return nil
}

創建一個 TCP 服務器首先創建了一個 socket,接著綁定到監聽到地址和把 socket 改成 listen 狀態,這樣就完成了服務器的啟動,啟動成功后執行 fd.init() 把 socket 對應的 fd 注冊到 IO 多路復用模塊。

func (fd *netFD) init() error {
 return fd.pfd.Init(fd.net, true)
}

func (fd *FD) Init(net string, pollable bool) error {
 err := fd.pd.init(fd)
 return err
}

func (pd *pollDesc) init(fd *FD) error {
  // 懶初始化 IO 多路復用模塊
 serverInit.Do(runtime_pollServerInit)
  // 注冊 fd 讀寫事件
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
  // 記錄這個上下文,對應一個 pollDesc 結構體,后續用到
 pd.runtimeCtx = ctx
 return nil
}

接著就調 netpoll 模塊的 poll_runtime_pollOpen。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  // 從 cache 里分配一個 pollDesc
 pd := pollcache.alloc()
 lock(&pd.lock)
 wg := pd.wg.Load()
 rg := pd.rg.Load()
 pd.fd = fd
 pd.closing = false
 pd.setEventErr(false, 0)
 pd.rseq++
 pd.rg.Store(pdNil)
 pd.rd = 0
 pd.wseq++
 pd.wg.Store(pdNil)
 pd.wd = 0
 pd.self = pd
 pd.publishInfo()
 unlock(&pd.lock)
  // 注冊 fd 
 errno := netpollopen(fd, pd)
 return pd, 0
}

這樣就完成了服務器的啟動和 socket fd 事件的注冊,接著就調 accept 等待連接的到來。

func (l *TCPListener) Accept() (Conn, error) {
 c, err := l.accept()
 return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
 fd, err := ln.fd.accept()
 return newTCPConn(fd, ...), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
 d, rsa, errcall, err := fd.pfd.Accept()
  // 拿到 fd,創建一個 netFD
 netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
  // 再次注冊到 IO 多路復用模塊,進行數據通信
 netfd.init()
 return netfd, nil
}

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
  // 加鎖
 if err := fd.readLock(); err != nil {
  return -1, nil, "", err
 }
 defer fd.readUnlock()
  // 讀判斷是否有錯誤,有則返回
 if err := fd.pd.prepareRead(fd.isFile); err != nil {
  return -1, nil, "", err
 }
  
 for {
  s, rsa, errcall, err := accept(fd.Sysfd)
  if err == nil {
   return s, rsa, "", err
  }
  switch err {
  case syscall.EAGAIN:
   if fd.pd.pollable() {
        // 阻塞等待,重新 accept
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  return -1, nil, errcall, err
 }
}

這里有兩個地方涉及到 netpoll,分別是 prepareRead 和 accept。

func (pd *pollDesc) prepareRead(isFile bool) error {
 return pd.prepare('r', isFile)
}

func convertErr(res int, isFile bool) error {
 switch res {
 case pollNoError:
  return nil
 case pollErrClosing:
  return errClosing(isFile)
 case pollErrTimeout:
  return ErrDeadlineExceeded
 case pollErrNotPollable:
  return ErrNotPollable
 }
}

func (pd *pollDesc) prepare(mode int, isFile bool) error {
 if pd.runtimeCtx == 0 {
  return nil
 }
 res := runtime_pollReset(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

func poll_runtime_pollReset(pd *pollDesc, mode int) int {
  // 判斷是否有錯誤
 errcode := netpollcheckerr(pd, int32(mode))
  // 有則返回
 if errcode != pollNoError {
  return errcode
 }
 if mode == 'r' {
  pd.rg.Store(pdNil)
 } else if mode == 'w' {
  pd.wg.Store(pdNil)
 }
 return pollNoError
}

func netpollcheckerr(pd *pollDesc, mode int32) int {
 info := pd.info()
 if info.closing() {
  return pollErrClosing
 }
  // 超時
 if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
  return pollErrTimeout
 }
 if mode == 'r' && info.eventErr() {
  return pollErrNotPollable
 }
 return pollNoError
}

如果沒有發生錯誤則繼續調 accept,當沒有連接時,accept 會返回 EAGAIN(非阻塞調用條件不滿足時的錯誤碼),接著執行 waitRead。

if fd.pd.pollable() {
  if err = fd.pd.waitRead(fd.isFile); err == nil {
    continue
  }
}

func (pd *pollDesc) waitRead(isFile bool) error {
 return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
 if pd.runtimeCtx == 0 {
  return errors.New("waiting for unsupported file type")
 }
 res := runtime_pollWait(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
 errcode := netpollcheckerr(pd, int32(mode))
 if errcode != pollNoError {
  return errcode
 }
  // 阻塞當前協程
 for !netpollblock(pd, int32(mode), false) {
  errcode = netpollcheckerr(pd, int32(mode))
  if errcode != pollNoError {
   return errcode
  }
 }
 return pollNoError
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
  // 執行到這時,可能已經就緒了,則返回,因為 Go 是多線程的,一個線程讀的時候,另一個線程可能寫
  if gpp.CompareAndSwap(pdReady, pdNil) {
   return true
  }
    // 一般情況,設置為 pdWait 狀態,表示準備進入阻塞狀態
  if gpp.CompareAndSwap(pdNil, pdWait) {
   break
  }
 }
  // waitio 是 false,但一般沒有 error,執行 gopark 阻塞協程
 if waitio || netpollcheckerr(pd, mode) == pollNoError {
  gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
 }
  // 被喚醒,重置為 pdNil
 old := gpp.Swap(pdNil)
 return old == pdReady
}

netpollblock 最終調 gopark 阻塞協程。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, ...) {
 mp.waitlock = lock
 mp.waitunlockf = unlockf
 releasem(mp)
 mcall(park_m)
}

func park_m(gp *g) {
 mp := getg().m
  // 修改協程為 _Gwaiting 狀態
 casgstatus(gp, _Grunning, _Gwaiting)
  // 執行 waitunlockf,即 netpollblockcommit
 if fn := mp.waitunlockf; fn != nil {
  ok := fn(gp, mp.waitlock)
  mp.waitunlockf = nil
  mp.waitlock = nil
    // 如果返回 false,則說明就緒了,修改協程為 _Grunnable,繼續執行它
  if !ok {
   casgstatus(gp, _Gwaiting, _Grunnable)
   execute(gp, true) // Schedule it back, never returns.
  }
 }
  // 否則重新調度其他協程
 schedule()
}

正常情況下,gopark 設置協程為 Gwaiting 狀態,然后重新調度,相當于該協程就暫停執行了。但是從開始執行 gopark 到現在可能情況已經發生了變化,所以 Go 還會執行一下鉤子函數 waitunlockf,這里是 netpollblockcommit。

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
  // 如果當前還是 pdWait 狀態則修改 pollDesc 的 rg 或 wg 字段為當前協程結構體
 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
  // 修改成功則等待者加 1
 if r {
  netpollAdjustWaiters(1)
 }
  // 返回是否修改成功
 return r
}

正常情況下,這個過程是 pollDesc 的 rg 或 wg 字段從 Nil 到 pdWait 再到協程結構體地址,除非執行過程中事件已經就緒。從前面的分析可以看到啟動一個服務器并調用 Accept 后為什么協程會阻塞了,從中也可以看到 Go 中以同步方式寫異步代碼的底層實現。

那么協程阻塞后,什么時候才會喚醒呢?又是怎么喚醒的?Go 會在某些時機調用 IO 多路復用模塊的 netpoll 獲取就緒的事件,從而喚醒關聯的協程。這種時機有幾個,比如在 sysmon 線程中定時獲取,或者調度時獲取,我們搜索 netpoll 函數的調用就可以看到,下面以 sysmon 線程的處理為例。

if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
  // 更新上次 poll 的時間
  sched.lastpoll.CompareAndSwap(lastpoll, now)
  // 通過 IO 多路復用模塊獲取就緒的事件(所以關聯的 g)列表
  list, delta := netpoll(0) // non-blocking - returns list of goroutines
  if !list.empty() {
    // 把g 改成就緒并放入隊列等待調度
    injectglist(&list)
    incidlelocked(1)
    // 等待者減 delta(負數)
    netpollAdjustWaiters(delta)
  }
}

前面介紹過 netpoll 最終會調 netpollready。

func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
  // 喚醒的協程個數
 delta := int32(0)
 var rg, wg *g
  // 喚醒對應的等待者,讀或寫
 if mode == 'r' || mode == 'r'+'w' {
  rg = netpollunblock(pd, 'r', true, &delta)
 }
 if mode == 'w' || mode == 'r'+'w' {
  wg = netpollunblock(pd, 'w', true, &delta)
 }
  // 喚醒成功插入隊列
 if rg != nil {
  toRun.push(rg)
 }
 if wg != nil {
  toRun.push(wg)
 }
 return delta
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
    // 當前狀態
  old := gpp.Load()
    // 之前就是 pdReady 則不需要再處理,等待上層消費后改成 Nil
  if old == pdReady {
   return nil
  }
    // 新狀態
  new := pdNil
    // 這里是 true,設置超時并且超時時 ioready 是 false
  if ioready {
   new = pdReady
  }
    // 改成 pdReady 狀態
  if gpp.CompareAndSwap(old, new) {
      // 當前狀態如果是 pdWait 則改成 old,即不需要處理該協程,
      // 前面介紹阻塞協程時講到,pdWait 是準備進入阻塞狀態,然后在 netpollblockcommit 會再進一步判斷,如果這里改成 pdReady 狀態,則協程不會阻塞。
   if old == pdWait {
    old = pdNil
   } else if old != pdNil { // old 是等待讀寫事件的協程
    *delta -= 1 // 等待者減一
   }
      // 返回待喚醒的協程
   return (*g)(unsafe.Pointer(old))
  }
 }
}

netpollunblock 第一種處理場景是事件就緒時把狀態改成 pdReady,第二種處理場景是事件沒有就緒,但設置了超時時間并且已經超時,則把狀態改成 Nil,把該協程改成就緒狀態,然后返回阻塞在該事件的協程并把它加入到隊列中等待調度執行。

func injectglist(glist *gList) {
 head := glist.head.ptr()
 var tail *g
 qsize := 0
  // 改成就緒狀態
 for gp := head; gp != nil; gp = gp.schedlink.ptr() {
  tail = gp
  qsize++
  casgstatus(gp, _Gwaiting, _Grunnable)
 }

 // Turn the gList into a gQueue.
 var q gQueue
 q.head.set(head)
 q.tail.set(tail)
 *glist = gList{}
  // 啟動新的線程處理
 startIdle := func(n int) {
  for i := 0; i < n; i++ {
   mp := acquirem() // See comment in startm.
   lock(&sched.lock)

   pp, _ := pidlegetSpinning(0)
   if pp == nil {
    unlock(&sched.lock)
    releasem(mp)
    break
   }

   startm(pp, false, true)
   unlock(&sched.lock)
   releasem(mp)
  }
 }

 pp := getg().m.p.ptr()
 if pp == nil {
  lock(&sched.lock)
    // 放入全局隊列
  globrunqputbatch(&q, int32(qsize))
  unlock(&sched.lock)
  startIdle(qsize)
  return
 }
  // 放到 p 本地隊列
 if !q.empty() {
  runqputbatch(pp, &q, qsize)
 }
}

協程和 IO 多路復用的結合思路也可以參考 Russ Cox 寫的 libtask 和這個分析文章 https://zhuanlan.zhihu.com/p/360477474。

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2024-02-02 11:03:11

React數據Ref

2024-02-21 19:02:05

Go模板化方式

2022-01-17 07:50:37

Go代碼規范

2022-03-05 23:29:18

LibuvwatchdogNode.js

2022-06-16 07:50:35

數據結構鏈表

2022-11-21 16:57:20

2022-08-29 08:05:44

Go類型JSON

2022-11-08 08:45:30

Prettier代碼格式化工具

2024-07-29 10:35:44

KubernetesCSI存儲

2024-08-19 10:24:14

2024-01-29 08:21:59

AndroidOpenCV車牌

2023-10-30 07:05:31

2023-12-27 07:31:45

json產品場景

2024-04-28 08:24:27

分布式架構Istio

2024-01-18 09:38:00

Java注解JDK5

2024-03-12 08:37:32

asyncawaitJavaScript

2023-12-07 12:29:49

Nginx負載均衡策略

2022-07-08 09:27:48

CSSIFC模型

2024-01-19 08:25:38

死鎖Java通信

2023-07-26 13:11:21

ChatGPT平臺工具
點贊
收藏

51CTO技術棧公眾號

欧美日韩国产123区| 国产一级黄色录像| 久久久精品成人| brazzers在线观看| 日韩精品水蜜桃| 91麻豆高清视频| 久久在线精品视频| 999在线免费视频| 欧美香蕉爽爽人人爽| 午夜精品电影| 欧美一级黄色片| 久久五月天婷婷| 精品无码人妻一区二区三区| 中文幕av一区二区三区佐山爱| 国产欧美精品区一区二区三区 | 天堂中文av在线资源库| 国产麻豆精品一区二区| 在线精品视频视频中文字幕| 欧美精品一区免费| 欧美一区二区三区激情| 激情久久久久久久| 精品国产髙清在线看国产毛片| 自拍偷拍一区二区三区| 91国内精品久久久| 97久久视频| 911精品产国品一二三产区| 先锋影音亚洲资源| 国产一区二区三区在线观看| 国产亚洲激情| 精品夜色国产国偷在线| aⅴ在线免费观看| 可以在线观看的av| 日韩高清不卡在线| 伊人久久男人天堂| 久久国产精品无码一级毛片| 在线免费看h| 伊人开心综合网| 国产精品久久九九| 国产黄色片免费看| 国产传媒欧美日韩成人精品大片| 色狠狠色狠狠综合| 日韩三级电影| 99在线小视频| 一本色道久久综合亚洲精品高清| 亚洲免费精彩视频| 黄大色黄女片18第一次| 宅男在线观看免费高清网站| 成人一道本在线| 欧美一区二区三区四区在线| 国产精品无码无卡无需播放器| 小说区图片区色综合区| 欧美少妇一区二区| 久久久久久久香蕉| 精品美女视频在线观看免费软件| 人人狠狠综合久久亚洲| 久久国产精品久久久久| 水蜜桃av无码| 欧美大胆a级| 在线观看免费亚洲| 青青草综合在线| 八戒八戒神马在线电影| 99精品视频在线观看| 国产精品久久久久久久久借妻| 老司机深夜福利网站| 51精品国产| 在线观看精品一区| 天堂在线资源视频| 男女羞羞视频在线观看| 国产亚洲欧美中文| 99蜜桃在线观看免费视频网站| 亚洲成熟少妇视频在线观看| 午夜精品999| 欧美精品第一页在线播放| 99久久久无码国产精品衣服| 成人在线视频你懂的| 欧美色综合网站| 五月天视频在线观看| 成人欧美一区二区三区的电影| 亚洲免费观看高清| 日韩免费电影一区二区| 午夜国产福利在线| 99精品国产一区二区三区不卡| 久久久久一区二区| 亚洲av无码国产精品永久一区| 日韩黄色免费电影| 国产精品一久久香蕉国产线看观看| 亚洲黄色一区二区| 国产精品a级| 久久综合久久美利坚合众国| 久久久久亚洲av成人片| 亚洲欧美视频一区二区三区| 欧美高清自拍一区| 永久免费看mv网站入口| 成人精品视频| 亚洲日本中文字幕免费在线不卡| 男女性杂交内射妇女bbwxz| 国产精品久久久久久久久久久久久久久 | 欧美人与动牲性行为| 日韩欧美福利视频| 99久久免费观看| 国内精品久久久久久野外| 中文字幕高清一区| 日韩三级在线播放| 91网址在线观看| 欧美性猛交xxxx乱大交蜜桃| 中文字幕在线视频精品| 天堂在线精品| 久久91亚洲人成电影网站| 精品国产国产综合精品| 亚洲毛片视频| 4388成人网| 国产伦理一区二区| 国产一二精品视频| 成人日韩在线电影| 国产精品国产精品国产专区| 麻豆精品蜜桃视频网站| 国产精品视频99| 国产一区二区视频免费观看| 99久久99精品久久久久久| www亚洲国产| 三级资源在线| 亚洲不卡一区二区三区| 日韩a∨精品日韩在线观看| 欧亚在线中文字幕免费| 色综合久久中文字幕| 熟妇人妻va精品中文字幕| 巨胸喷奶水www久久久免费动漫| 欧美亚洲尤物久久| 久久久久xxxx| av一区二区高清| 俺也去精品视频在线观看| h色网站在线观看| 亚洲一级一区| 91日韩在线视频| 成人免费视频国产免费麻豆| av一区二区久久| 欧美日韩日本网| 在线免费av网站| 亚洲另类色综合网站| 亚洲人辣妹窥探嘘嘘| 韩国三级成人在线| 亚洲国产精彩中文乱码av| 五级黄高潮片90分钟视频| 成人中文在线| 国产成人综合一区二区三区| 国产精品久久综合青草亚洲AV| 国产欧美日韩卡一| 韩国一区二区av| 九一国产精品| 久久99久久99精品免观看粉嫩| ,亚洲人成毛片在线播放| 国产精品麻豆99久久久久久| 国产 欧美 日韩 一区| 日韩三级av高清片| 国产亚洲一区二区在线| 欧美日韩在线观看成人| 男女精品网站| 亚洲在线观看视频| 日本天堂影院在线视频| 欧美日韩视频免费播放| 色呦色呦色精品| 亚洲精品无吗| 九九九久久久久久| 亚洲AV无码国产精品午夜字幕| 亚洲一级在线观看| 亚洲精品永久视频| 亚洲电影在线一区二区三区| 日本高清不卡在线| 国内老熟妇对白xxxxhd| 亚洲精品成人少妇| 中文字幕在线视频播放| 国产精品综合色区在线观看| 日本在线观看一区二区| 欧美91在线|欧美| 亚洲女人天堂av| 日批视频免费观看| 99riav久久精品riav| 免费在线观看毛片网站| 欧美激情理论| 成人免费看片网站| av在线播放观看| 亚洲成人动漫在线播放| 中文字幕xxxx| 中文字幕一区二区视频| av免费在线播放网站| 99久久www免费| 国产精品视频精品视频| 91麻豆国产福利在线观看宅福利| 日韩成人xxxx| 国产奶水涨喷在线播放| 国产精品正在播放| 久久视频这里有精品| 日韩极品一区| 激情小说综合网| 好吊日av在线| 精品国偷自产国产一区| 国产三级国产精品国产国在线观看| 成av人片一区二区| 日韩一级性生活片| 一区二区三区亚洲变态调教大结局| 26uuu日韩精品一区二区| 黄色免费在线看| 亚洲欧美日韩第一区| 国产黄色片av| 欧美日韩不卡一区| 黄色片免费观看视频| 亚洲黄色尤物视频| 色www亚洲国产阿娇yao| 蜜臀av性久久久久av蜜臀妖精| 日本高清视频一区二区三区 | 性做久久久久久| 人妻无码一区二区三区免费| 99国产精品久久久久久久久久 | 男女午夜激情视频| 亚洲欧美一区在线| 亚洲欧洲精品在线 | 高潮毛片在线观看| 亚洲最新视频在线| 天堂在线视频免费观看| 精品久久久久久久久国产字幕 | 欧美激情视频一区| 黄色网页在线播放| 日韩在线资源网| 99精品视频免费看| 欧美美女网站色| 麻豆一区产品精品蜜桃的特点| 国产iv一区二区三区| 久久国产午夜精品理论片最新版本| 欧美电影免费观看高清| 欧美一级二级三级九九九| 牛牛影视久久网| 国产一区自拍视频| 视频在线日韩| 久久久精品影院| 色窝窝无码一区二区三区成人网站| 在线播放91灌醉迷j高跟美女| 波多野结衣电影在线播放| 中文字幕字幕中文在线中不卡视频| 麻豆传媒在线看| 亚洲综合社区| 午夜免费福利小电影| 亚洲看片一区| 97国产在线播放| 日韩亚洲国产欧美| 免费一级特黄毛片| 精品久久成人| 日产国产精品精品a∨| 综合亚洲自拍| 欧美日韩中文国产一区发布 | 日韩www在线| 少妇精品视频一区二区 | 2019中文字幕在线视频| 亚洲最新中文字幕| 欧美精品hd| 亚洲第一天堂av| 无码精品视频一区二区三区| 欧美色图天堂网| 91亚洲国产成人久久精品麻豆| 欧美老肥妇做.爰bbww| 国产精品怡红院| 日韩美一区二区三区| 中文字幕+乱码+中文| 亚洲一区视频在线| 国产成人亚洲欧洲在线| 狠狠躁18三区二区一区| 在线观看亚洲黄色| 欧美精品久久天天躁| www.蜜臀av| 欧美精品1区2区3区| 99热这里只有精| 亚洲精品99久久久久| 国产夫绿帽单男3p精品视频| 日韩欧美国产高清| 亚洲av片一区二区三区| 国产香蕉精品视频一区二区三区| 在线观看麻豆| 欧美激情亚洲另类| 日本电影欧美片| 午夜精品久久久久久久久久久久| 蜜桃视频网站在线观看| 久国内精品在线| 日韩电影免费看| 国产精品天天狠天天看| www.久久东京| 日韩欧美第二区在线观看| 欧美成人tv| 欧美成人精品欧美一级乱| 国产资源在线一区| 天堂久久久久久| 中文字幕在线观看不卡| 日本天堂网在线观看| 欧美性感一类影片在线播放| 性生交大片免费看女人按摩| 亚洲图片制服诱惑| av影视在线| 91欧美激情另类亚洲| 日日天天久久| www.18av.com| 热久久久久久久| 尤物网站在线观看| 亚洲欧美日韩国产手机在线| 黄色免费av网站| 色域天天综合网| 国产激情视频在线播放| 中国人与牲禽动交精品| av电影在线观看| 久久久亚洲精选| 黄色在线网站噜噜噜| 欧美一级黄色网| 国产日韩在线观看视频| 日本精品视频一区| 日韩午夜av在线| 苍井空张开腿实干12次| 成人在线视频一区| 青青青手机在线视频| 欧美性极品xxxx娇小| 亚洲国产精品欧美久久| www国产亚洲精品久久网站| 三级成人黄色影院| 麻豆精品蜜桃一区二区三区| 国产精品hd| 成年人性生活视频| 中文字幕在线不卡国产视频| 懂色av蜜臀av粉嫩av分享吧最新章节| 亚洲第一福利网站| 欧美人与禽猛交乱配| 亚洲aⅴ男人的天堂在线观看| 日韩精品不卡一区二区| 日本美女高潮视频| 国产欧美视频一区二区三区| 久久久久久久久久影院| 精品少妇一区二区三区视频免付费 | 久久久久999| vam成人资源在线观看| 亚洲一区三区| 影音先锋久久| 中文字幕欧美视频| 91婷婷韩国欧美一区二区| 精品在线视频观看| 欧美成人一区二区三区片免费 | 欧美套图亚洲一区| 欧美在线视频观看| 亚洲毛片免费看| 亚洲成熟丰满熟妇高潮xxxxx| 91麻豆精品秘密| 韩国av中文字幕| 亚洲美女精品久久| 精品亚洲美女网站| 婷婷久久伊人| 日韩国产在线一| 久久视频一区二区三区| 欧美日韩中字一区| 免费在线观看av片| 亚洲xxxx做受欧美| 国产欧美日韩在线一区二区| 色婷婷综合久久久久中文字幕 | 国产在线观看免费视频今夜| 欧美性猛交xxxx乱大交蜜桃| 暖暖视频在线免费观看| 国产成人av在线播放| 欧美一级精品片在线看| 亚洲va在线va天堂va偷拍| 亚洲私人黄色宅男| 69xxxx国产| 色综合影院在线| 欧美成年网站| 国产96在线 | 亚洲| 337p粉嫩大胆噜噜噜噜噜91av| 无码日韩精品一区二区| 日韩在线观看免费网站 | 黑人巨大精品欧美黑白配亚洲| 熟女丰满老熟女熟妇| 色www精品视频在线观看| 9191在线观看| 国产福利久久精品| 国产视频一区欧美| 国产黄色录像片| 欧美va日韩va| 欧美日韩在线精品一区二区三区激情综合 | 狠狠爱综合网| 制服丝袜中文字幕第一页| 亚洲一二三级电影| 国产中文在线视频| 91日本在线观看| 久久精品女人| 欧美三根一起进三p| 亚洲美女黄色片| 亚洲精品一二三**| 国产v亚洲v天堂无码久久久| 一区二区三区在线观看欧美 | jizzjizzjizz欧美| 成人一级片网站| 亚洲精品国产成人久久av盗摄| 欧美美女色图| 成人免费视频观看视频| 日本人妖一区二区| 日产亚洲一区二区三区| 日韩中文字幕免费视频| 日韩aaa久久蜜桃av| 国产农村妇女精品久久|