精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SeaweedFS 分布式文件系統(tǒng)源碼分析

開發(fā) 架構(gòu)
Master Server 支持多節(jié)點(奇數(shù))部署。使用 Raft 一致性算法來選舉 Leader 節(jié)點,這樣可以保證在 Leader 節(jié)點宕機的情況下,其他節(jié)點可以重新選舉出新的 Leader 節(jié)點,從而保證系統(tǒng)的高可用性。

本文基于 seaweedfs 3.46[1]

SeaweedFS 的架構(gòu)包括 Master Server、Volume Server 和 Filer Server 。

圖片

啟動 Master Server

啟動一個 Master Server 可以使用以下命令:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動入口以及所有的參數(shù)定義在 weed/command/master.go ,默認情況 http 監(jiān)聽端口使用 9333 ,grpc 監(jiān)聽端口則在 http 端口的基礎(chǔ)上加 10000 (所有組件的默認規(guī)則)即 19333 :

if *masterOption.portGrpc == 0 {
 *masterOption.portGrpc = 10000 + *masterOption.port
}

Master Server 支持多節(jié)點(奇數(shù))部署。使用 Raft 一致性算法來選舉 Leader 節(jié)點,這樣可以保證在 Leader 節(jié)點宕機的情況下,其他節(jié)點可以重新選舉出新的 Leader 節(jié)點,從而保證系統(tǒng)的高可用性。

如下,啟動一個由三個 Master Server 節(jié)點所組成的集群:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9333 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9334 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9335 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"

當 Master Server 啟動時,它會嘗試加入集群并參與 Leader 選舉。一旦選舉完成,Leader 節(jié)點將負責(zé)管理整個集群以及 Volume Server 。

首先會創(chuàng)建一個 Master Server 包裝的 weed_server.RaftServer 對象:

raftServer, err = weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
 glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}

在 weed_server.NewRaftServer() 方法中會創(chuàng)建好 Raft 節(jié)點所需的各種參數(shù)和對象,然后調(diào)用 github.com/seaweedfs/raft[2] 庫創(chuàng)建 RaftServer 對象并啟動 Raft 節(jié)點:

type RaftServer struct {
 // 存儲初始節(jié)點信息
 peers map[string]pb.ServerAddress
 // Raft 節(jié)點
 raftServer raft.Server
 // HashiCorp Raft 節(jié)點
 RaftHashicorp *hashicorpRaft.Raft
 // 用于管理 Raft 節(jié)點之間的通信
 TransportManager *transport.Manager
 // Raft 節(jié)點的數(shù)據(jù)目錄
 dataDir string
 // Raft 節(jié)點的地址
 serverAddr pb.ServerAddress
 // Raft 集群的拓撲結(jié)構(gòu)
 topo *topology.Topology
 // Raft 節(jié)點的 gRPC 服務(wù)
 *raft.GrpcServer
}

func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
 // 通過 option 創(chuàng)建一個 RaftServer 對象 s
 s := &RaftServer{
  peers:      option.Peers,
  serverAddr: option.ServerAddr,
  dataDir:    option.DataDir,
  topo:       option.Topo,
 }

 //...

 // 調(diào)用 github.com/seaweedfs/raft 庫,創(chuàng)建 RaftServer 對象
 s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")

 //...

 // 啟動 Raft 節(jié)點
 if err := s.raftServer.Start(); err != nil {
  return nil, err
 }

 // 將節(jié)點加入到 Raft 集群中
 for name, peer := range s.peers {
  if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
   return nil, err
  }
 }

 //...

 glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())

 return s, nil
}

最后,會打印出當前的 Leader 節(jié)點,如果對 Raft 選舉算法的處理細節(jié)感興趣,可以繼續(xù)深入 s.raftServer.Start() 的實現(xiàn)。

Raft 節(jié)點啟動成功后,Master Server 會注冊一些集群相關(guān)的接口,方便查看集群狀態(tài):

r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET", "HEAD")
if *masterOption.raftHashicorp {
 r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
}

請求如下:

$ curl http://127.0.0.1:9333/cluster/status
{"IsLeader":true,"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9334"]}
$ curl http://127.0.0.1:9334/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9333"]}
$ curl http://127.0.0.1:9335/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9333","127.0.0.1:9334"]}

啟動 Volume Server

啟動一個 Volume Server 可以使用以下命令:

weed volume -mserver="127.0.0.1:9333" -dir=data -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動入口以及所有的參數(shù)定義在 weed/command/volume.go ,默認情況 http 監(jiān)聽端口使用 8080 ,grpc 監(jiān)聽端口使用 18080 。

其中,-mserver 為 Master Server 連接地址,當需要連接的 Master Server 為集群時,可以將多個 Master Server 的連接地址用逗號分隔; -dir 則用來指定 Volume Server 存儲數(shù)據(jù)文件的目錄。

和 Master Server 不同,Volume Server 支持橫向擴展,其節(jié)點數(shù)量規(guī)模可以隨著數(shù)據(jù)量和性能需求的變化而隨時動態(tài)調(diào)整。

一旦 Volume Server 啟動后,就會與 Master Server 保持通信,匯報自身的狀態(tài),并根據(jù) Master Server 的指示執(zhí)行創(chuàng)建、刪除、修復(fù)等操作。

核心邏輯在 weed/server/volume_grpc_client_to_master.go 的 VolumeServer.doHeartbeat 方法。

首先會創(chuàng)建一個 Master Server 的 gRPC 連接客戶端,并使用該客戶端調(diào)用 SendHeartbeat 方法:

// 創(chuàng)建 Master Server 的 gRPC 連接客戶端
client := master_pb.NewSeaweedClient(grpcConnection)
// 調(diào)用 SendHeartbeat
stream, err := client.SendHeartbeat(ctx)
if err != nil {
 glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
 return "", err
}

SendHeartbeat 方法是一個雙向流式 RPC ,允許在一次調(diào)用中發(fā)送多個請求和響應(yīng),其 ProtoBuf 定義如下:

rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}

接著創(chuàng)建一個 goroutine 用來處理從 Master Server 發(fā)送過來的 Heartbeat 請求:

go func() {
 for {
  // 從輸入流中讀取 Heartbeat 請求
  in, err := stream.Recv()
  if err != nil {
   doneChan <- err
   return
  }
  // ...

  // 如果 Heartbeat 請求中包含了卷大小限制,并且該限制和當前 Volume Server 中保存的限制不同
  if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
   // 將 Volume Server 中保存的限制更新為 Heartbeat 請求中的限制
   vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
   // 調(diào)用 vs.store.MaybeAdjustVolumeMax() 方法重新計算卷的最大容量
   if vs.store.MaybeAdjustVolumeMax() {
    // 如果計算結(jié)果發(fā)生了變化,則使用 stream.Send() 方法向 Master Server 發(fā)送 Heartbeat 響應(yīng)
    if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
     glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
     return
    }
   }
  }
  // 如果 Heartbeat 請求中包含了新的 Master Server 地址,并且該地址和當前地址不同
  if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
   // 通知主函數(shù)切換新的 Master Server 地址作為 Leader
   glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
   newLeader = pb.ServerAddress(in.GetLeader())
   doneChan <- nil
   return
  }
 }
}()

最后使用一個 for select 來監(jiān)聽來自 Volume Server 存儲層的四個通道:NewVolumesChan、NewEcShardsChan、DeletedVolumesChan 和 DeletedEcShardsChan。每當有新的卷或 EC 分片被創(chuàng)建或刪除時,會生成一個 Heartbeat 消息,并使用 stream.Send() 方法將其發(fā)送到 Master Server ,同時也會定期發(fā)送心跳消息給 Master Server :

for {
 select {
 // 有新的卷被創(chuàng)建
 case volumeMessage := <-vs.store.NewVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有新的 EC 分片被創(chuàng)建
 case ecShardMessage := <-vs.store.NewEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有卷被刪除
 case volumeMessage := <-vs.store.DeletedVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有 EC 分片被刪除
 case ecShardMessage := <-vs.store.DeletedEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 發(fā)送卷信息的心跳消息
 case <-volumeTickChan.C:
  glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
  vs.store.MaybeAdjustVolumeMax()
  if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // 發(fā)送 EC 分片信息的心跳消息
 case <-ecShardTickChan.C:
  glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
  if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // Volume Server 停止,退出監(jiān)聽
 case err = <-doneChan:
  return
 // 用于在 Volume Server 停止時發(fā)送最終的心跳消息
 case <-vs.stopChan:
  // ...
  glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
  if err = stream.Send(emptyBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
  return
 }
}

啟動 Filer Server

啟動一個 Filer Server 可以使用以下命令:

weed filer -s3 -master="127.0.0.1:9333" -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動入口以及所有的參數(shù)定義在 weed/command/filer.go ,默認情況 http 監(jiān)聽端口使用 8888 ,grpc 監(jiān)聽端口使用 18888 。

在這里,-master 為 Master Server 連接地址,同樣地,當需要連接的 Master Server 為集群時,可以將多個 Master Server 的連接地址用逗號分隔; -s3 則代表要啟動 S3 網(wǎng)關(guān)功能,默認監(jiān)聽 8333 端口。

Filer Server 可以理解為一個文件管理器,通過向下對接 Volume Server 與 Master Server,對外提供豐富的功能與特性,除了自身提供的 API 接口,還支持擴展其它比如 POSIX ,WebDAV,S3 等的文件操作接口。

Filer Server 通過外部數(shù)據(jù)庫存儲文件的元數(shù)據(jù)信息。默認情況下,使用的是 leveldb ,支持替換為其它流行的數(shù)據(jù)庫,例如 Sqlite、MySql、Etcd 等,具體可以參考 wiki/Filer-Stores[3] 。

作為一個 API Server ,F(xiàn)iler Server 在架構(gòu)上就是一個服務(wù)端+數(shù)據(jù)庫模型,其節(jié)點的數(shù)量和規(guī)模可以根據(jù)不同的工作負載和使用情況進行優(yōu)化和調(diào)整。

上傳文件

首先分析 Filer Server 自身提供的 API 接口,上傳文件可以直接調(diào)用 :

$ curl -F "file_name=@test.txt" -X POST "http://127.0.0.1:8888"
{"name":"test.txt","size":14}

文件上傳的接口定義在 weed/server/filer_server_handlers_write.go 的 PostHandler 方法:

func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
 // 解析請求的目標路徑
 // ...
 // 解析請求的查詢參數(shù),用于確定文件的存儲位置和屬性
 // ...
 if query.Has("mv.from") {
  // 若查詢參數(shù)中出現(xiàn) mv.from ,則進行文件移動操作
  fs.move(ctx, w, r, so)
 } else {
  // 文件上傳操作,自動分塊
  fs.autoChunk(ctx, w, r, contentLength, so)
 }

 util.CloseRequest(r)
}

跟蹤到 fs.autoChunk 方法:

func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
 //...

 if r.Method == "POST" {
  // 上傳文件
  if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
   reply, err = fs.mkdir(ctx, w, r)
  } else {
   // 自動分塊上傳
   reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
  }
 } else {
  // 創(chuàng)建目錄
  reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
 }

 //...
}

繼續(xù)來到 fs.doPostAutoChunk 方法:

func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {

 // 讀取上傳的文件內(nèi)容
 multipartReader, multipartReaderErr := r.MultipartReader()
 if multipartReaderErr != nil {
  return nil, nil, multipartReaderErr
 }

 // 讀取第一個分塊,在這里,我們只需要讀取第一個分塊,即上傳文件的內(nèi)容的分塊
 part1, part1Err := multipartReader.NextPart()
 if part1Err != nil {
  return nil, nil, part1Err
 }

 // 獲取文件名和 Content-Type
 fileName := part1.FileName()
 if fileName != "" {
  fileName = path.Base(fileName)
 }
 contentType := part1.Header.Get("Content-Type")
 if contentType == "application/octet-stream" {
  contentType = ""
 }

 // 核心邏輯
 // 將上傳的文件內(nèi)容轉(zhuǎn)換為文件分塊,并返回文件分塊的相關(guān)信息
 fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
 if err != nil {
  return nil, nil, err
 }

 // 計算文件內(nèi)容的 MD5 值
 md5bytes = md5Hash.Sum(nil)
 // 保存文件元數(shù)據(jù)信息
 filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
 if replyerr != nil {
  fs.filer.DeleteChunks(fileChunks)
 }

 return
}

這些都比較好讀,繼續(xù)跟蹤到核心邏輯處 fs.uploadReaderToChunks ,方法內(nèi)首先會進行一些正確性校驗和必要變量的初始化,然后開啟一個循環(huán),不斷讀取數(shù)據(jù)并將其轉(zhuǎn)換為一個或多個 Chunk :

func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
 // ...一系列操作
 // 進行一些正確性校驗和必要變量的初始化

 for {

  // 使用對象池機制限制 bytes.Buffer 對象的數(shù)量,優(yōu)化內(nèi)存占用
  bytesBufferLimitCond.L.Lock()
  for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
   glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
   bytesBufferLimitCond.Wait()
  }
  atomic.AddInt64(&bytesBufferCounter, 1)
  bytesBufferLimitCond.L.Unlock()

  bytesBuffer := bufPool.Get().(*bytes.Buffer)
  glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))

  // 【關(guān)鍵】分塊操作,每個塊就是一個 bytes.Buffer
  // 根據(jù) chunkSize 從 partReader 中讀取數(shù)據(jù),并將讀取的數(shù)據(jù)保存到 bytes.Buffer 對象中
  limitedReader := io.LimitReader(partReader, int64(chunkSize))

  bytesBuffer.Reset()

  dataSize, err := bytesBuffer.ReadFrom(limitedReader)

  // 處理讀取數(shù)據(jù)時可能出現(xiàn)的錯誤,以及在讀取完整個文件時的處理
  // ...

  wg.Add(1)
  // 開啟 goroutine 處理
  go func(offset int64) {
   defer func() {
    // 將 bytes.Buffer 對象歸還對象池
    bufPool.Put(bytesBuffer)
    atomic.AddInt64(&bytesBufferCounter, -1)
    // 通知其他 goroutine 可以使用更多的 bytes.Buffer 對象
    bytesBufferLimitCond.Signal()
    wg.Done()
   }()

   // 【關(guān)鍵】上傳數(shù)據(jù)塊
   chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)

   if toChunkErr != nil {
    // 記錄上傳錯誤
    uploadErrLock.Lock()
    if uploadErr == nil {
     uploadErr = toChunkErr
    }
    uploadErrLock.Unlock()
   }
   if chunks != nil {
    fileChunksLock.Lock()
    fileChunksSize := len(fileChunks) + len(chunks)
    for _, chunk := range chunks {
     // 【關(guān)鍵】將當前上傳的數(shù)據(jù)塊添加到 fileChunks 列表中
     fileChunks = append(fileChunks, chunk)
     glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
    }
    fileChunksLock.Unlock()
   }
  }(chunkOffset)

  // 更新已經(jīng)讀取的數(shù)據(jù)塊的大小
  chunkOffset = chunkOffset + dataSize

  if dataSize < int64(chunkSize) {
   // 已經(jīng)讀取完整個文件
   break
  }
 }

 wg.Wait()

 if uploadErr != nil {
  // 上傳出錯,刪除 fileChunks
  fs.filer.DeleteChunks(fileChunks)
  return nil, md5Hash, 0, uploadErr, nil
 }
 // 【關(guān)鍵】對已經(jīng)上傳的數(shù)據(jù)塊,即 fileChunks 進行排序,以便后續(xù)可以正確地進行數(shù)據(jù)合并
 slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
  return a.Offset < b.Offset
 })
 // 返回 fileChunks 給調(diào)用方保存
 return fileChunks, md5Hash, chunkOffset, nil, smallContent
}

文件的分塊操作都是在 Filer Server 完成的。而其中上傳數(shù)據(jù)塊的 fs.dataToChunk 方法會與 Master Server 進行交互。

該方法首先會調(diào)用 fs.assignNewFileInfo 向 Master Server 請求分配一個新的文件 ID(fid)以及上傳 URL :

fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil {
 // ...
 return uploadErr
}

然后使用分配的 fid 調(diào)用上傳 URL 上傳數(shù)據(jù)塊:

uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
 // ...
 return uploadErr
}

這個由 Master Server 所分配的上傳 URL ,實際就是 Volume Server 的上傳地址,例 http://127.0.0.1:8080/14,1f343c431d ,其中 14,1f343c431d 就是文件 ID ,其實這個文件 ID 更準確地說應(yīng)該是代表一個數(shù)據(jù)塊的文件 ID。

SeaweedFS 會根據(jù) maxMB 參數(shù),來把文件拆分成多個塊存儲,默認大小是 4MB 。即一個 100MB 大小的文件,上傳到 SeaweedFS 后會被分成 25 個塊存儲,也就是申請分配了 25 個文件 ID 。

f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")

到這里,總算捋清流程了。

那還有一個 S3 接口的文件上傳呢?

不用擔(dān)心,SeaweedFS S3 只是做了一個 API 的代理轉(zhuǎn)發(fā),依舊轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口,邏輯依舊和上面一致,代碼位置在 weed/s3api/s3api_object_handlers.go :

// 這里的 uploadUrl 實際就是 Filer Server 的地址
// 例如在名稱為 test 的 S3 Bucket 中上傳 test.txt 文件
// 則 uploadUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")

下載文件

和上傳文件一樣,SeaweedFS S3 為文件下載做了一個代理轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口:

// 這里的 destUrl 實際就是 Filer Server 的地址
// 例如要下載 test Bucket 中的 test.txt 文件
// 則 destUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)

所以,當下載一個文件時:

$ curl http://127.0.0.1:8888/test.txt
hello test.txt

直接來看 weed/server/filer_server_handlers_read.go 的 GetOrHeadHandler 接口:

func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {

 // ...
 // 從 URL 中獲取文件或文件夾路徑

 // 根據(jù)文件或文件夾的完整路徑從元數(shù)據(jù)數(shù)據(jù)庫中查找出 Entry 記錄(即文件的元數(shù)據(jù)信息)

 // 若是文件夾,則列出文件夾下的文件
 // ...

 // 如果指定了 metadata=true 參數(shù),則直接返回文件或文件夾的元數(shù)據(jù)信息
 if query.Get("metadata") == "true" {
  // ...
  return
 }

 // 減少服務(wù)器帶寬
 // 通過 Etag 資源標識對比資源是否發(fā)生變化
 etag := filer.ETagEntry(entry)
 if checkPreconditions(w, r, entry) {
  // 如果資源未發(fā)生改變,則返回 304 Not Modified 響應(yīng),不返回具體的資源
  // 客戶端可以直接讀取緩存中的數(shù)據(jù)
  return
 }

 // 設(shè)置 ETag 標識到響應(yīng)頭
 setEtag(w, etag)

 // ...

 // 這里是用來處理獲取圖片文件的邏輯
 if rangeReq := r.Header.Get("Range"); rangeReq == "" {
  // ...
 }

 // 獲取普通文件核心邏輯
 processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
  // 偏移量從請求頭中獲取,例 Range: bytes=100-199
  // 若無指定偏移量,默認為 0
  // 判斷請求的范圍是否在文件的內(nèi)容大小范圍內(nèi)
  if offset+size <= int64(len(entry.Content)) {
   // ...
   return err
  }
  // 從元數(shù)據(jù)數(shù)據(jù)庫獲取到的chunks信息
  chunks := entry.GetChunks()
  // 判斷文件是否只存在于遠程存儲中,例如 AWS S3 、Google Cloud Storage 等
  if entry.IsInRemoteOnly() {
   // 將遠程對象緩存到本地集群,并更新新的chunks
   // ...
  }

  // 【核心】開始讀取文件并寫入 HTTP 響應(yīng)
  // MasterClient :Master 節(jié)點的客戶端
  // chunks :要讀取的文件數(shù)據(jù)塊列表
  // offset :請求的文件內(nèi)容的起始位置
  // size :請求的文件內(nèi)容的大小
  // DownloadMaxBytesPs :下載速率的限制,單位是字節(jié)/秒
  err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
  if err != nil {
   stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
   glog.Errorf("failed to stream content %s: %v", r.URL, err)
  }
  return err
 })
}

根據(jù)代碼,我們可以直接通過 metadata=true 查詢參數(shù)查看文件的元數(shù)據(jù)信息:

$ curl http://127.0.0.1:8888/test.txt?metadata=true
{"FullPath":"/test.txt","Mtime":"2023-04-23T17:18:37+08:00","Crtime":"2023-04-23T17:18:37+08:00","Mode":432,"Uid":4294967295,"Gid":4294967295,"Mime":"text/plain","TtlSec":0,"UserName":"","GroupNames":null,"SymlinkTarget":"","Md5":"wuSNy045Bd4p8mTjIc40cg==","FileSize":14,"Rdev":0,"Inode":0,"Extended":null,"chunks":[{"file_id":"14,1f343c431d","size":14,"modified_ts_ns":1682241517592601300,"e_tag":"wuSNy045Bd4p8mTjIc40cg==","fid":{"volume_id":14,"file_key":31,"cookie":876364573},"is_compressed":true}],"HardLinkId":null,"HardLinkCounter":0,"Content":null,"Remote":null,"Quota":0}

其中最重要的就是 chunks 信息,里面定義了該文件的所有數(shù)據(jù)塊信息,只要把所有數(shù)據(jù)塊拼湊一起,就可以還原出整個文件。文件大小的原因,這里剛好只有一個塊,其文件 ID 為 14,1f343c431d 。

繼續(xù)解讀文件下載的核心方法 filer.StreamContentWithThrottler ,首先獲取所有文件 ID 所對應(yīng)的 URL 列表:

// 將 chunks 轉(zhuǎn)換為視圖列表 chunkViews
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)

fileId2Url := make(map[string][]string)

// 通過 chunkViews.Front() 獲取 chunkViews 列表的頭部元素,然后在每次迭代中將 x 移動到下一個元素,直到遍歷完整個列表
for x := chunkViews.Front(); x != nil; x = x.Next {
 // 從 x.Value 中獲取 chunkView 對象
 chunkView := x.Value
 var urlStrings []string
 var err error
 // 獲取 chunkView 對應(yīng)的文件 ID 的 URL 列表,并將 URL 列表存儲在 urlStrings 變量中
 // 在分布式系統(tǒng)中,網(wǎng)絡(luò)故障和其他因素可能導(dǎo)致某些請求失敗,因此需要多次嘗試獲取 URL 列表,以提高獲取成功的概率
 for _, backoff := range getLookupFileIdBackoffSchedule {
  urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  if err == nil && len(urlStrings) > 0 {
   break
  }
  glog.V(4).Infof("waiting for chunk: %s", chunkView.FileId)
  time.Sleep(backoff)
 }
 // 錯誤處理
 // ...
 fileId2Url[chunkView.FileId] = urlStrings
}

然后,通過獲取到的 URL 列表下載文件的所有 chunk :

// 下載速度限制器
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
// 通過遍歷 chunkViews 列表來下載每個 chunk
for x := chunkViews.Front(); x != nil; x = x.Next {
 chunkView := x.Value
 // 檢查文件偏移量
 if offset < chunkView.ViewOffset {
  // ...
 }
 urlStrings := fileId2Url[chunkView.FileId]
 start := time.Now()
 // 【核心】從 URL 列表中讀取 chunkView 的數(shù)據(jù),并將數(shù)據(jù)寫入到 writer 中給到客戶端
 err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
 // 更新文件偏移量
 offset += int64(chunkView.ViewSize)
 // 更新剩余數(shù)據(jù)大小
 remaining -= int64(chunkView.ViewSize)
 // ...
}
// 檢查文件的所有數(shù)據(jù)是否都已經(jīng)成功下載
if remaining > 0 {
 glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
 err := writeZero(writer, remaining)
 if err != nil {
  return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
 }
}

可以總結(jié)出,下載文件本質(zhì)也是和 Master Server 交互,通過文件 ID 獲取到對應(yīng) Volume Server 的數(shù)據(jù)塊下載地址列表,按照列表順序請求下載數(shù)據(jù)塊,最后重新整合成了一個完整的文件返回給客戶端。

最后,附上文件下載的流程:

圖片

參考資料

[1]seaweedfs 3.46: https://github.com/seaweedfs/seaweedfs/tree/3.46

[2]github.com/seaweedfs/raft: https://github.com/seaweedfs/raft/tree/v1.1.0

[3]wiki/Filer-Stores: https://github.com/seaweedfs/seaweedfs/wiki/Filer-Stores

責(zé)任編輯:武曉燕 來源: gopher云原生
相關(guān)推薦

2010-11-01 05:50:46

分布式文件系統(tǒng)

2017-10-17 08:33:31

存儲系統(tǒng)分布式

2010-11-15 13:24:07

分布式文件系統(tǒng)

2013-01-07 10:29:31

大數(shù)據(jù)

2012-08-31 16:04:11

HDFS分布式文件系統(tǒng)

2013-06-18 14:00:59

HDFS分布式文件系統(tǒng)

2012-09-19 15:05:24

MogileFS分布式文件系統(tǒng)

2010-06-04 18:45:43

Hadoop分布式文件

2012-09-19 13:43:13

OpenAFS分布式文件系統(tǒng)

2013-05-27 14:46:06

文件系統(tǒng)分布式文件系統(tǒng)

2011-07-15 17:48:27

Platform

2011-03-16 14:23:38

分布式文件

2012-10-09 16:43:47

FastDFS分布式文件系統(tǒng)

2012-05-10 14:04:07

分布式文件系統(tǒng)架構(gòu)

2012-05-10 15:23:53

分布式文件系統(tǒng)測試

2020-01-03 08:33:57

Ceph硬件系統(tǒng)

2022-09-13 07:51:08

JuiceFS分布式文件系統(tǒng)

2012-07-20 14:40:22

2013-01-07 10:42:43

HDFS

2014-03-12 17:40:07

GlusterFS分布式文件系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號

蜜桃视频在线观看网站| 国产网友自拍视频| 亚洲三级在线| 一区二区激情小说| 久久久久综合一区二区三区| 特级西西444www高清大视频| 牛夜精品久久久久久久99黑人| 亚洲国产成人爱av在线播放| 91av俱乐部| 宅男在线观看免费高清网站 | 国产美女视频免费看| 男女视频在线| 中文字幕精品一区| 国产精成人品localhost| 成人午夜精品视频| 亚洲无线一线二线三线区别av| 亚洲偷欧美偷国内偷| 日本黄色www| 视频二区不卡| 午夜视频一区二区三区| 性欧美大战久久久久久久免费观看 | 日本精品一区二区三区高清| 男人日女人的bb| 成人精品一区| 91蝌蚪porny| 国产传媒一区| 国产欧美综合视频| 麻豆高清免费国产一区| 97av在线视频| 精品一区二区三区四| 日韩毛片视频| 亚洲天堂日韩电影| 国产 中文 字幕 日韩 在线| 日本一区二区三区电影免费观看| 欧美图片一区二区三区| 欧美日韩激情视频在线观看| 麻豆蜜桃在线| 亚洲美女免费视频| 亚洲高清资源综合久久精品| 欧美日韩国产中文字幕在线| 成人免费av资源| 亚洲自拍小视频免费观看| 亚洲精品无码久久久久| 美女爽到呻吟久久久久| 久久青草精品视频免费观看| 黄色一级视频免费观看| 真实国产乱子伦精品一区二区三区| 中文字幕av一区二区| 五月天精品视频| 美女精品一区最新中文字幕一区二区三区 | 午夜视频你懂的| 日韩一区二区三区免费| 色综合欧美在线视频区| 漂亮人妻被中出中文字幕| 多野结衣av一区| 精品国产91久久久久久老师| 国内精品在线观看视频| 高清精品在线| 欧美日韩国产精品一区二区三区四区 | 国产理论在线观看| 亚洲丝袜自拍清纯另类| 综合视频免费看| 国产一二区在线| 玉足女爽爽91| 欧美又粗又长又爽做受| 678在线观看视频| 天天综合日日夜夜精品| 午夜肉伦伦影院| 二吊插入一穴一区二区| 欧美日韩成人一区| 韩国三级hd中文字幕有哪些| 成人在线tv视频| 亚洲欧美日韩精品久久亚洲区 | 国产精品久久久久久久久久ktv| 国产免费a视频| 久久精品国产77777蜜臀| 成人久久一区二区三区| 亚洲精品97久久中文字幕| 成人免费视频app| 欧美视频1区| 最近高清中文在线字幕在线观看| 亚洲色欲色欲www| 欧美精品在欧美一区二区| 成人黄色动漫| 欧美制服丝袜第一页| 中文字幕中文在线| 91久久精品无嫩草影院| 日韩精品在线私人| 麻豆精品国产免费| 99re国产精品| 国产精品日韩在线观看| 亚洲AV无码国产精品午夜字幕| 99久久国产综合精品女不卡| 视频一区二区三| av色综合久久天堂av色综合在| 欧美日韩国产一区在线| 艹b视频在线观看| 国产精品17p| 在线播放国产一区二区三区| 免费三片在线播放| 日韩成人免费电影| 精品午夜一区二区三区| 免费超碰在线| 色综合久久久久综合体桃花网| 在线视频观看91| 猛男gaygay欧美视频| 欧美成人第一页| aaa在线视频| 成人深夜视频在线观看| 亚洲精品成人a8198a| a'aaa级片在线观看| 欧美日韩精品系列| 美国黄色a级片| 国户精品久久久久久久久久久不卡| 国产91精品网站| 隣の若妻さん波多野结衣| 中文字幕一区免费在线观看| 美女福利视频在线| 成午夜精品一区二区三区软件| 色偷偷噜噜噜亚洲男人| 中日韩黄色大片| 国产成人高清视频| 丰满女人性猛交| 免费污视频在线一区| 亚洲国产精品久久久久秋霞蜜臀| 国产又粗又硬又长又爽| 日本成人中文字幕在线视频| 国新精品乱码一区二区三区18| 黄色网页在线观看| 欧美三级蜜桃2在线观看| www.自拍偷拍| 国产情侣久久| 国产伦精品一区二区三区照片91| 国产原创视频在线观看| 欧美色区777第一页| 精品成人无码一区二区三区| 国产精品女主播一区二区三区| 福利视频久久| 日本在线观看大片免费视频| 欧美二区三区91| 国产成人免费在线观看视频| 免费av网站大全久久| 午夜精品亚洲一区二区三区嫩草 | 色婷婷综合久久久中文字幕| 无遮挡aaaaa大片免费看| 一区在线免费| 国产日韩欧美一区二区| 都市激情国产精品| 国产丝袜视频一区| 天天干,天天干| 国产午夜精品一区二区三区嫩草| 日日摸天天爽天天爽视频| 亚洲黄页网站| 国产精品草莓在线免费观看| jizz在线免费观看| 欧美日韩国产综合一区二区三区| 亚洲色图欧美色| 经典三级在线一区| 7777在线视频| 久久97久久97精品免视看秋霞| 久久久久久伊人| 日本人妖在线| 欧美最猛性xxxxx直播| 99在线视频免费| 国产一区二区三区日韩 | 国产又粗又猛又爽又黄的视频四季| 日本中文字幕一区二区视频| 亚洲不卡中文字幕| 国产在视频一区二区三区吞精| 久久激情视频久久| 成人av免费播放| 亚洲不卡av一区二区三区| 久久久久久久久免费看无码| 青娱乐精品视频| 欧美精品一区二区性色a+v| 日韩中文在线| 91地址最新发布| 91精品国产综合久久久久久豆腐| 91精品国产乱码| 自拍偷拍欧美亚洲| 国产精品网站在线观看| 欧美性受xxxx黒人xyx性爽| 亚洲精品影院在线观看| 色涩成人影视在线播放| 9l亚洲国产成人精品一区二三| 欧美在线视频一区| 日本中文在线观看| 亚洲成色www8888| 丰满人妻一区二区三区四区| 一区二区三区美女视频| 蜜桃av免费看| 国产精品69毛片高清亚洲| 国产中文字幕免费观看| 国产精品99视频| 久久国产精品一区二区三区| 日韩一区二区三区四区五区 | 国产91欧美| 久久久噜噜噜久噜久久| 成人精品一区二区三区校园激情| 精品久久人人做人人爰| 国产情侣免费视频| 性做久久久久久免费观看| 四虎影视1304t| 91在线丨porny丨国产| 久久精品亚洲天堂| 久久亚洲影院| 全黄性性激高免费视频| 99久久.com| 欧美精品一区二区三区四区五区| 欧美国产中文高清| 国产精品盗摄久久久| 123区在线| 欧美超级免费视 在线| 成人高清免费在线播放| 亚洲精品国产综合区久久久久久久| 一级全黄少妇性色生活片| 欧美性色xo影院| 国产精品99精品无码视| 一区二区三区蜜桃网| 国产精品视频一区二区在线观看| 久久久国产综合精品女国产盗摄| 国产精九九网站漫画| 久久99国产精品久久| 国产97色在线 | 日韩| 翔田千里一区二区| 欧美 日韩 国产在线观看| 亚洲特色特黄| 久久久久久久香蕉| 欧美+亚洲+精品+三区| 一区不卡字幕| 成人6969www免费视频| 日本高清不卡三区| 免费国产自久久久久三四区久久| 国产伦精品一区二区三区视频孕妇 | 欧美 日韩 国产在线| 日韩美女毛片| 久久精品国产一区二区三区不卡| 操欧美女人视频| 99三级在线| 国产成人一二片| 国产乱码一区| 红杏一区二区三区| 激情小说网站亚洲综合网| 欧洲精品一区| 免费av在线一区二区| 亚洲成a人片77777在线播放| 久久99精品久久久久久久久久 | jizz免费一区二区三区| 国产精品久久久久久久美男| 亚洲成av在线| 国产精品综合不卡av| 日本久久二区| 亚洲一区二区久久久久久久| 日韩视频一二区| 国产九色91| 伊人成综合网yiren22| 日韩av一区二区三区在线| jiujiure精品视频播放| 亚洲一区二区三区午夜| 亚洲精品国产成人影院| 女同性恋一区二区| 激情综合中文娱乐网| 欧美日韩亚洲一| 麻豆精品视频在线观看视频| www.污网站| 成人精品在线视频观看| 亚洲av无码国产精品久久| 国产三级精品三级| 国产精品视频一区二区在线观看| 一区二区欧美国产| 可以免费在线观看的av| 欧美视频在线观看一区| 99热这里是精品| 亚洲精品久久在线| av网站大全在线观看| 久久成人一区二区| 182在线视频观看| 国产精品av网站| 日韩一级淫片| 欧美一区1区三区3区公司| 91精品国产自产在线观看永久∴ | 日韩精品一区二区三区三区免费| 熟妇人妻一区二区三区四区 | 日日夜夜天天综合入口| 欧日韩在线观看| 国产亚洲高清一区| 免费在线成人av| 欧美激情亚洲| 亚洲精品一二三四五区| 粉嫩蜜臀av国产精品网站| 亚洲AV无码国产成人久久| 亚洲免费av高清| 9i精品福利一区二区三区| 日韩午夜中文字幕| 黄色小视频在线免费观看| 精品少妇一区二区30p| 欧美不卡高清一区二区三区| 福利视频久久| 国产精品国产一区| 免费在线观看毛片网站| 国产成人免费视频网站| 亚洲一区 欧美| 欧美性猛交xxxx黑人| 亚洲国产精品一| 日韩在线观看网址| 另类图片综合电影| 国产精品精品软件视频| 国产二区精品| 日韩无套无码精品| av在线综合网| 欧美色图亚洲天堂| 欧美日韩第一区日日骚| 美女欧美视频在线观看免费 | 成人av网站在线| 青娱乐91视频| 欧美日韩精品二区第二页| 青梅竹马是消防员在线| 国内精品久久久| 久久国产三级| 亚洲成人自拍| 久久成人精品| 亚洲の无码国产の无码步美| 亚洲尤物视频在线| 国产av无码专区亚洲av麻豆| 最近更新的2019中文字幕| 色尼玛亚洲综合影院| 久久青青草原| 国产视频亚洲| 女同性恋一区二区三区| 亚洲综合激情小说| 精品久久久久久亚洲综合网站| 久久亚洲精品毛片| 伊人亚洲精品| 在线观看国产一区| 老色鬼精品视频在线观看播放| 少妇精品无码一区二区免费视频| 色婷婷一区二区| 日本一级在线观看| 青草成人免费视频| 自拍偷拍欧美一区| 免费黄色特级片| 久久久精品国产免费观看同学| 亚洲免费在线视频观看| 亚洲乱码国产乱码精品精天堂| 自拍视频在线看| 日本高清一区| 日本不卡一区二区三区高清视频| 欧美黄色一级生活片| 欧美亚洲动漫制服丝袜| 中文字幕在线观看日本| 国产美女91呻吟求| 91成人精品| 国产精品果冻传媒| 精品久久久久久久久久久久久久| 五月天激情婷婷| 日本久久中文字幕| 欧美日韩国产高清电影| 手机看片福利日韩| 中文字幕日本不卡| www.国产三级| 97国产suv精品一区二区62| 偷拍亚洲精品| 超碰在线播放91| 一区二区三区在线不卡| 欧美亚洲精品在线观看| 欧美在线欧美在线| 日韩在线视频精品| 黑人无套内谢中国美女| 午夜国产不卡在线观看视频| 欧美偷拍视频| 国产欧美一区二区三区在线| 午夜精品久久| 久久丫精品国产亚洲av不卡| 欧美日韩一区国产| 黄页网站在线| 日本一区二区在线视频| 国产在线视频一区二区| 日本va欧美va国产激情| 在线日韩欧美视频| 亚洲电影一区| 欧美精品成人网| 亚洲精品免费电影| 你懂的免费在线观看| 91丨九色丨国产在线| 国产一级一区二区| 国产精品夜夜夜爽阿娇| 日韩精品久久久久久福利| 色成人综合网| 国产视频一视频二| 亚洲视频网在线直播| 嫩草在线播放| 99久久精品无码一区二区毛片 | www.四虎在线| 在线免费观看日本欧美| 影院在线观看全集免费观看| 日本不卡一区二区三区视频| 国产成人精品综合在线观看 | 亚洲风情亚aⅴ在线发布| 黑人一区二区三区| 亚洲中文字幕无码专区|