精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

HBase Compaction 原理與線上調優實踐

大數據
本文對 HBase Compaction 的原理、流程以及限流的策略進行了詳細的介紹,列舉了幾個線上進行調優的案例,最后對 Compaction 的相關參數進行了總結。

一、Compaction 介紹

HBase 是基于一種 LSM-Tree(Log-Structured Merge Tree)體系架構的存儲模型設計的,寫入時先寫入 WAL(Write-Ahead-Log)日志,再寫入 Memstore 緩存,滿足一定條件后,會執行 Flush 操作將緩存數據刷寫到磁盤,生成一個 HFile 數據文件。隨著數據不斷寫入,HFile 文件會越來越多,文件太多導致查詢數據時 IO 次數增加,進而影響到 HBase 的查詢性能。為了優化讀的性能,采用合并小 HFile 的方法來減少文件數量,這種合并 HFile 的操作就稱為 Compaction。Compaction 是從一個 Region 的一個 Store 中選擇部分 HFile 文件進行合并的過程。合并原理是從這些待合并的數據文件中依次讀出 KeyValue,由小到大排序后寫入一個新的文件中。之后這個新生成的文件就會取代之前已合并的所有文件對外提供服務。

1.1 Compaction 的分類

HBase 根據合并規模將 Compaction 分為兩類:Minor Compaction 和 Major Compaction。

  • Minor Compaction 是指選取部分小的、相鄰的 HFile,將它們合并成一個更大的 HFile;
  • Major Compaction 是指將一個Store 中所有的 HFile 合并成一個 HFile,這個過程會清理三種無意義的數據:TTL 過期數據、被刪除的數據與版本號超過設定版本號的數據。

下圖形象的描述了2種 Compaction 的區別:

圖片

一般情況下,Major Compaction 持續時間比較長,整個過程消耗大量系統資源,因此線上數據量較大的業務通常推薦關閉自動觸發 Major Compaction 功能,改為在業務低峰期手動觸發(或設置策略自動在低峰期觸發)。

1.2 Compaction的意義

  1. 合并小文件,減少文件數,提升讀取性能,穩定隨機讀延遲;
  2. 合并的時候會讀取遠程 DataNode 上的文件寫入本地 DataNode,提高數據的本地化率;
  3. 清除過期數據和被刪除的數據,減少表的存儲量。

1.3 Compaction觸發時機

HBase 中觸發 Compaction 的時機有很多種,最常見的觸發時機有三種:后臺線程周期性檢查時觸發、MemStore Flush 觸發以及手動觸發。

(1)后臺線程周期性檢查:后臺線程 CompactionChecker 會定期檢查是否需要執行 Compaction,檢查周期為

hbase.server.thread.wakefrequency *hbase.server.compactchecker.interval.multiplier,這里主要考慮的是一段時間內沒有寫入請求導致 Flush 觸發不了 Compaction 的情況。其中參數 hbase.server.thread.wakefrequency 默認值是10s,是 HBase 服務端線程喚醒時間間隔,參數 hbase.server.compactchecker.interval.multiplier 默認值1000,是 Compaction 操作周期性檢查乘數因子。10 * 1000 s 約等于 2hrs 46mins 40sec。

(2)MemStore Flush:Compaction 的根源在于 Flush,MemStore 達到一定閾值就會觸發 Flush ,將內存中的數據刷寫到磁盤生成 HFile 文件,隨著 HFile 文件越來越多就需要執行 Compaction。HBase 每次 Flush之后,都會判斷是否需要進行 Compaction,一旦滿足 Minor Compaction 或 Major Compaction 的條件便會觸發執行。

(3)手動:是指通過 HBase API、HBase Shell 或者 Master UI 界面等方式執行 compact、major_compact 等命令。

二、Compaction流程

了解完基本的背景后,接下來介紹 Compaction 的整個過程。

  • RegionServer 啟動一個 Compaction 檢查線程,定期對 Region 的 Store 進行檢查;
  • Compaction 始于特定的觸發條件。一旦觸發,HBase 會將該 Compaction 交由一個獨立的線程處理;
  • 從對應的 Store 中選擇合適的 HFile 文件,這步是整個 Compaction 的核心,選取文件時需要遵循很多條件,比如文件數既不能太多也不能太少、文件大小不能太大等,盡可能地選取承載 IO 負載重的文件集。基于此,HBase 實現了多種文件選取策略:常用的有
    RatioBasedCompactionPolicy、
    ExploringCompactionPolicy
    StripeCompactionPolicy 等,也支持自定義的 Compaction 算法;
  • 選出待合并的文件后,會根據這些 HFile 文件的總大小選擇對應的線程池來進行處理;
  • 對這些文件執行具體的 Compaction 操作。

下圖簡單的描述了上述流程。

圖片

下面對圖2中具體的每一步進行詳細說明。

2.1 啟動 Compaction 定時線程

在 RegionServer 啟動時,會初始化 CompactSplitThread 線程以及定時檢查的 CompactionChecker ,默認10s執行一次。

// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);

其中 CompactSplitThread 是用來實現 Compaction 以及 Split 流程的類,而 CompactChecker 是用來周期性檢查是否執行 Compaction 的。

CompactionChecker 是 ScheduledChore 類型,而 ScheduledChore 是 HBase定期執行的一個 Task。

2.2 觸發 Compaction

Compaction 的觸發時機在上面已經介紹過,下面對這3種觸發機制進行詳細的介紹。

2.2.1 后臺線程周期性檢查

后臺線程 CompactionChecker 定期檢查是否需要執行 Compaction,檢查周期為:hbase.regionserver.compaction.check.period(默認10s)。

(1)首先檢查文件數是否大于可執行 Compaction 的文件數,一旦大于就會觸發 Compaction。

(2)如果不滿足,會接著檢查是否到了 Major Compaction 的執行周期。如果當前 Store 中 HFile 的最早更新時間早于某個值 mcTime,就會觸發 Major Compaction,其中 mcTime 是一個浮動值,浮動區間默認為[7-7*0.2,7+7*0.2],其中7為配置項 hbase.hregion.majorcompaction 設置,0.2為配置項 hbase.hregion.

majorcompaction.jitter,所以在7天左右就會執行一次 Major Compaction。用戶如果想禁用 Major Compaction,只需要將參數hbase.hregion

.majorcompaction 設為0。

(3)如果到了 Major Compaction 的執行周期:

  • 首先判斷有幾個 HFile 文件,如果只有1個文件,會判斷是否有過期數據、本地化率是否比較低,如果都不滿足就不做 Major
    Compaction;
  • 如果大于1個文件,也會做 Major
    Compaction。

后臺線程周期性檢查的流程如圖3所示。

圖片

下面是該線程的關鍵代碼:

//ScheduledChore的run方法會一直調用chore函數
@Override
protected void chore() {
  // 遍歷instance下的所有online的region 進行循環檢測
  // onlineRegions是HRegionServer上存儲的所有能夠提供有效服務的在線Region集合;
  for (HRegion r : this.instance.onlineRegions.values()) {
    if (r == null)
      continue;
    // 取出每個region的store
    for (Store s : r.getStores().values()) {
      try {
        // 檢查是否需要compact的時間間隔 hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,multiplier默認1000;
        long multiplier = s.getCompactionCheckMultiplier();
        assert multiplier > 0;
        // 未到multiplier的倍數跳過,每當迭代因子iteration為合并檢查倍增器multiplier的整數倍時,才會發起檢查
        if (iteration % multiplier != 0) continue;
        // 需要合并的話,發起SystemCompaction請求,此處最終比較的是是否當前hfile數量減去正在compacting的文件數大于設置的compact min值。若滿足則執行systemcompact
        if (s.needsCompaction()) {
          // Queue a compaction. Will recognize if major is needed.
          this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
              + " requests compaction");
        } else if (s.isMajorCompaction()) {
          if (majorCompactPriority == DEFAULT_PRIORITY
              || majorCompactPriority > r.getCompactPriority()) {
            this.instance.compactSplitThread.requestCompaction(r, s, getName()
                + " requests major compaction; use default priority", null);
          } else {
            this.instance.compactSplitThread.requestCompaction(r, s, getName()
                + " requests major compaction; use configured priority",
              this.majorCompactPriority, null);
          }
        }
      } catch (IOException e) {
        LOG.warn("Failed major compaction check on " + r, e);
      }
    }
  }
  iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}

2.2.2 Memstore Flush 觸發

Memstore Flush 會產生 HFile 文件,文件越來越多就需要 Compaction。因此在每次執行完 Flush 操作之后,都會對當前 Store 中的文件數進行判斷,一旦文件數超過 Compaction 的閾值 ,就會觸發 Compaction。這里需要強調的是,Compaction 是以 Store 為單位進行的,而在 Flush 觸發條件下,整個 Region 的所有 Store 都會執行 Compaction,所以會在短時間內可能會執行多次 Compaction。下面是 Flush 操作觸發 Compaction 的代碼。

/**
   * Flush a region.
   * @param region Region to flush.
   * @param emergencyFlush Set if we are being force flushed. If true the region
   * needs to be removed from the flush queue. If false, when we were called
   * from the main flusher run loop and we got the entry to flush by calling
   * poll on the flush queue (which removed it).
   * @param forceFlushAllStores whether we want to flush all store.
   * @return true if the region was successfully flushed, false otherwise. If
   * false, there will be accompanying log messages explaining why the region was
   * not flushed.
   */
  private boolean flushRegion(final Region region, final boolean emergencyFlush,
      boolean forceFlushAllStores) {
    synchronized (this.regionsInQueue) {
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
      }
    }
    lock.readLock().lock();
    try {
      // flush
      notifyFlushRequest(region, emergencyFlush);
      FlushResult flushResult = region.flush(forceFlushAllStores);
     // 檢查是否需要compact
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      // 檢查是否需要split
      boolean shouldSplit = ((HRegion)region).checkSplit() != null;
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        // 發起compact請求
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" + (region != null ? (" for region " +
          Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      lock.readLock().unlock();
      wakeUpIfBlocking();
    }
    return true;
  }

2.2.3 手動觸發

手動觸發就是通過命令或者 API 接口手動觸發 Compaction,手動觸發的原因有三個:

  • 很多業務擔心自動 Major Compaction 影響讀寫性能,因此會選擇低峰期手動觸發;
  • 用戶在執行完修改ttl的屬性后希望立刻生效,執行手動觸發 Major Compaction;
  • 硬盤容量不夠的情況下手動觸發 Major Compaction 刪除大量過期數據。

大多數都是基于第1點原因進行手動觸發。

2.3 選擇待合并的文件

Compaction 的核心就是選擇合適的文件進行合并,因為合并文件的大小以及其當前承載的 IO 直接決定了 Compaction 的效果。希望能找到這樣的文件:承載了大量 IO 請求但是文件大小很小,這樣 Compaction 本身不會消耗太多 IO,而且合并完成之后對讀的性能會有顯著提升。現實情況可能大部分都不會是這樣。目前 HBase 提供了多種 Minor Compaction 文件選擇策略,通過配置項 hbase.hstore.engine.class 設置。不管哪種策略,在執行之前都要做對文件做一些篩選操作,排除不符合條件的文件,以減少 Compaction 的工作量,減少對讀寫的影響。

  • 排除當前正在執行 Compaction 的文件;
  • 如果一個文件所有的記錄都已經過期,則直接將文件刪除;
  • 排除過大的單個文件,如果文件大小大于 hbase.hstore.compaction.max.size(默認Long最大值)則被排除,不排除會產生大量 IO 消耗。

排除完后剩下的文件稱為候選文件,接下來會再判斷是否滿足 Major Compaction 條件,如果滿足,就會選擇全部文件進行合并。判斷條件有下面三條,只要滿足其中一條就會執行 Major Compaction:

  • 到了 Compaction 自動執行的周期且候選文件數小于 hbase.hstore.compaction
    .max(默認10),如果關掉自動 Major Compaction 執行則不適用;
  • Store 中含有 Reference 文件,Reference 文件是 Split Region 產生的臨時引用文件,在 Compaction 過程中刪除;
  • 用戶手動執行的 Major Compaction。

如果不滿足上述執行條件,則為 Minor compaction。Minor Compaction 的策略有很多種,下面重點介紹 

RationBasedCompactionPolicy(0.98之前的版本)、ExploringCompactionPolicy(0.98之后默認的版本) 和 StripeCompactionPolicy 的執行策略。

2.3.1 Compaction文件選擇策略的建模

所謂的 Compaction 文件選擇策略可以建模為下面的問題:

圖片

圖中的每個數字表示了文件的 Sequence ID,數字越大則文件越新,很有可能剛剛Flush而成,意味著文件 Size 也可能越小。這樣的文件在 Compaction 時優先選擇,因此 Store下的 Storefile 文件會依據 Sequence ID 從小到大排序,依次標記為 f[0]、f[1]。。。。f[n-1],篩選策略就是要確定一個連續范圍 [Start, End] 內的 Storefile 參與 Compaction。

Compaction 的目的是減少文件數量和刪除無用的數據,優化讀性能,Compaction 實現是將原文件的內容重寫到新的文件,如果文件過大意味著 Compaction 的時間長,Compaction 過程中產生的 IO 放大越明顯,因此文件篩選的準則是用最小的 IO 代價去合并減少最多的文件數。

Compaction 依賴兩個先決條件:

  • 所有 StoreFile 按照順序進行排序(此順序為:老文件在前,新文件在后);
  • 參與 Compaction 的文件必須是連續的。

2.3.2 RationBasedCompactionPolicy

基本思想就是選擇在固定 End 為最后一個文件的前提下(一般情況),從隊列頭開始滑動尋找 Start,直到 Start 滿足下面的條件之一便停止掃描:

  1. 當前文件大小 < 比當前文件新的所有文件大小總和 * ratio,就是滿足公式 f[start].size <= ratio * (f[start+1].size +.......+ f[end-1].size)。其中 ration 是一個可變的比例,高峰期 ration 為1.2,非高峰期 ration 為5,非高峰期允許合并更大的文件。
    可以通過參數 hbase.offpeak.start.hour 和 hbase.offpeak.end.hour 設置高峰期時間段。
  2. 當前所剩候選文件數 >= hbase.store.compaction.min(默認為3),因為要保證本次 Compaction 的時候文件個數要大于配置的 Compaction 最小值。

下面附上 RationBasedCompactionPolicy 的具體邏輯代碼。

/**
  * @param candidates pre-filtrate
  * @return filtered subset
  * -- Default minor compaction selection algorithm:
  * choose CompactSelection from candidates --
  * First exclude bulk-load files if indicated in configuration.
  * Start at the oldest file and stop when you find the first file that
  * meets compaction criteria:
  * (1) a recently-flushed, small file (i.e. <= minCompactSize)
  * OR
  * (2) within the compactRatio of sum(newer_files)
  * Given normal skew, any newer files will also meet this criteria
  * <p/>
  * Additional Note:
  * If fileSizes.size() >> maxFilesToCompact, we will recurse on
  * compact().  Consider the oldest files first to avoid a
  * situation where we always compact [end-threshold,end).  Then, the
  * last file becomes an aggregate of the previous compactions.
  *
  * normal skew:
  *
  *         older ----> newer (increasing seqID)
  *     _
  *    | |   _
  *    | |  | |   _
  *  --|-|- |-|- |-|---_-------_-------  minCompactSize
  *    | |  | |  | |  | |  _  | |
  *    | |  | |  | |  | | | | | |
  *    | |  | |  | |  | | | | | |
  */
ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
  if (candidates.isEmpty()) {
    return candidates;
  }
  // we're doing a minor compaction, let's see what files are applicable
  int start = 0;
  // 獲取文件合并比例:取參數hbase.hstore.compaction.ratio,默認為1.2
  double ratio = comConf.getCompactionRatio();
  if (mayUseOffPeak) {
    // 取參數hbase.hstore.compaction.ratio.offpeak,默認為5.0
    ratio = comConf.getCompactionRatioOffPeak();
    LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
  }
  // get store file sizes for incremental compacting selection.
  final int countOfFiles = candidates.size();
  long[] fileSizes = new long[countOfFiles];
  long[] sumSize = new long[countOfFiles];
  for (int i = countOfFiles - 1; i >= 0; --i) {
    StoreFile file = candidates.get(i);
    fileSizes[i] = file.getReader().length();
    // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
    // tooFar表示后移動最大文件數位置的文件大小,也就是剛剛滿足達到最大文件數位置的那個文件,從i至tooFar數目為合并時允許的最大文件數
    int tooFar = i + comConf.getMaxFilesToCompact() - 1;
    sumSize[i] = fileSizes[i]
      + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
      - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
  }
  // 倒序循環,如果文件數目滿足最小合并時允許的最小文件數,且該位置的文件大小大于合并時允許的文件最小大小與下一個文件窗口文件總大小乘以一定比例中的較大者,則繼續;
  // 實際上就是選擇出一個文件窗口內能最小能滿足的文件大小的一組文件
  while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
    fileSizes[start] > Math.max(comConf.getMinCompactSize(),
        (long) (sumSize[start + 1] * ratio))) {
    ++start;
  }
  if (start < countOfFiles) {
    LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
      + " files from " + countOfFiles + " candidates");
  } else if (mayBeStuck) {
    // We may be stuck. Compact the latest files if we can.保證最小文件數目的要求
    int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
    if (filesToLeave >= 0) {
      start = filesToLeave;
    }
  }
  candidates.subList(0, start).clear();
  return candidates;
}

2.3.3 ExploringCompactionPolicy

該策略繼承自 RatioBasedCompactionPolicy,不同的是 Ration 策略在找到一個合適的文件集合之后就停止掃描了,而 Exploring 策略會把 Storefile 列表劃分成多個子隊列,從中找出一個最優解參與 Compaction。最優解可以理解為:待合并文件數最多或者待合并文件數相同的情況下文件較小,這樣有利于減少 Compaction 帶來的 IO 消耗。算法流程可以描述為:

  1. 從頭到尾遍歷文件,判斷所有符合條件的組合;
  2. 選擇組合內文件數 >= minFiles,且 <= maxFiles;
  3. 計算各組合文件的總大小 size,選擇組合 size <= MaxCompactSize,且 >= minCompactSize;
  4. 每個組合里面的每一個文件大小都必須滿足 FileSize(i) <= (sum(0,N,FileSize(_)) - FileSize(i)) * ration,意義在于去掉很大的文件,每次 Compaction 時應該盡量合并一些大小較小的文件;
  5. 滿足以上 1-4 條件的組合里面選擇文件數最多,文件數一樣多時進一步選擇文件總 size 最小的,目的在于盡可能多地合并文件并且 Compaction 帶來的 IO 壓力越小越好。

下面附上 ExploringCompactionPolicy 的具體邏輯代碼。

public List<StoreFile> applyCompactionPolicy(final List<StoreFile> candidates,
       boolean mightBeStuck, boolean mayUseOffPeak, int minFiles, int maxFiles) {
  
    final double currentRatio = mayUseOffPeak
        ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
    // Start off choosing nothing.
    List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
    List<StoreFile> smallest = mightBeStuck ? new ArrayList<StoreFile>(0) : null;
    long bestSize = 0;
    long smallestSize = Long.MAX_VALUE;
    int opts = 0, optsInRatio = 0, bestStart = -1; // for debug logging
    // Consider every starting place. 從頭到尾遍歷文件
    for (int start = 0; start < candidates.size(); start++) {
      // Consider every different sub list permutation in between start and end with min files.
      for (int currentEnd = start + minFiles - 1;
          currentEnd < candidates.size(); currentEnd++) {
        List<StoreFile> potentialMatchFiles = candidates.subList(start, currentEnd + 1);
        // Sanity checks
        if (potentialMatchFiles.size() < minFiles) {
          continue;
        }
        if (potentialMatchFiles.size() > maxFiles) {
          continue;
        }
        // Compute the total size of files that will
        // have to be read if this set of files is compacted. 計算文件大小
        long size = getTotalStoreSize(potentialMatchFiles);
  
        // Store the smallest set of files.  This stored set of files will be used
        // if it looks like the algorithm is stuck. 總size最小的
        if (mightBeStuck && size < smallestSize) {
          smallest = potentialMatchFiles;
          smallestSize = size;
        }
        if (size > comConf.getMaxCompactSize(mayUseOffPeak)) {
          continue;
        }
        ++opts;
        if (size >= comConf.getMinCompactSize()
            && !filesInRatio(potentialMatchFiles, currentRatio)) {
          continue;
        }
        ++optsInRatio;
        if (isBetterSelection(bestSelection, bestSize, potentialMatchFiles, size, mightBeStuck)) {
          bestSelection = potentialMatchFiles;
          bestSize = size;
          bestStart = start;
        }
      }
    }
    if (bestSelection.size() == 0 && mightBeStuck) {
      LOG.debug("Exploring compaction algorithm has selected " + smallest.size()
          + " files of size "+ smallestSize + " because the store might be stuck");
      return new ArrayList<StoreFile>(smallest);
    }
    LOG.debug("Exploring compaction algorithm has selected " + bestSelection.size()
        + " files of size " + bestSize + " starting at candidate #" + bestStart +
        " after considering " + opts + " permutations with " + optsInRatio + " in ratio");
    return new ArrayList<StoreFile>(bestSelection);
  }

2.3.4 StripeCompactionPolicy

Stripe Compaction (HBASE-7667)還是為了減少 Major Compaction 的壓力而提出的。其思想是:減少 Major Compaction 壓力最直接辦法是減少 Region 的大小,最好整個集群都是由很多小 Region 組成,這樣參與 Compaction 的文件總大小就必然不會太大。可是 Region 設置過小會導致 Region 數量很多,這一方面會導致 HBase 管理 Region 的開銷很大,另一方面 Region 過多也要求 HBase 能夠分配更多的內存作為 Memstore 使用,否則有可能導致整個 RegionServer 級別的 Flush,進而引起長時間的寫阻塞。因此單純地通過將 Region 大小設置過小并不能本質解決問題。

(1) Level Compaction

社區開發者借鑒了 Leveldb 的 Compaction 策略 Level Compaction。Level Compaction 設計思路是將 Store 中的所有數據劃分為很多層,每一層都會有一部分數據,如下圖所示:

圖片

數據組織形式不再按照時間前后進行組織,而是按照 KeyRange 進行組織,每個 KeyRange 中會包含多個文件,這些文件所有數據的 Key 必須分布在同一個范圍。比如 Key 分布在 Key0~KeyN 之間的所有數據都會落在第一個 KeyRange 區間的文件中,Key 分布在 KeyN+1~KeyT 之間的所有數據會分布在第二個區間的文件中,以此類推。

整個數據體系會被劃分為很多層,最上層(Level 0)表示最新數據,最下層(Level 6)表示最舊數據。每一層都由大量 KeyRange 塊組成(Level 0除外),KeyRange 之間沒有 Key 重合。而且層數越大,對應層的每個 KeyRange 塊大小越大,下層 KeyRange 塊大小是上一層大小的10倍。圖中 Range 顏色越深,對應的 Range 塊越大。

數據從 Memstore 中 Flush 之后,會首先落入 Level 0,此時落入 Level 0 的數據可能包含所有可能的 Key。此時如果需要執行 Compaction,只需要將 Level 0 中的 KV 一個一個讀出來,然后按照 Key 的分布分別插入 Level 1 中對應 KeyRange 塊的文件中,如果此時剛好 Level 1 中的某個KeyRange 塊大小超過了一定閾值,就會繼續往下一層合并。

Level Compaction 依然會有 Major Compaction 的概念,發生 Major Compaction 只需要將 Range 塊內的文件執行合并就可以,而不需要合并整個 Region 內的數據文件。

可見,這種 Compaction 在合并的過程中,從上到下只需要部分文件參與,而不需要對所有文件執行 Compaction 操作。另外,Level Compaction 還有另外一個好處,對于很多只讀最近寫入數據’的業務來說,大部分讀請求都會落到 Level 0,這樣可以使用 SSD 作為上層 Level 存儲介質,進一步優化讀。然而,這種 Compaction 因為 Level 層數太多導致 Compaction 的次數明顯增多,經過測試,發現這種 Compaction 并沒有對 IO 利用率有任何提升。

(2)Stripe Compaction

雖然原生的 Level Compaction 并不適用于 HBase,但是這種 Compaction 的思想卻激發了HBase 研發者的靈感,再結合之前提到的小 Region 策略,就形成了 Stripe Compaction。

同 Level Compaction 相同,Stripe Compaction 會將整個 Store 中的文件按照 Key 劃分為多個 Range,稱為 Stripe,Stripe 的數量可以通過參數設定,相鄰的 Stripe 之間 Key 不會重合。Stripe 類似于 Sub-Region 的概念,即將一個大 Region 切分成了很多小的 Sub-Region。

隨著數據寫入,Memstore 執行 Flush 之后形成 HFile,這些 HFile 并不會馬上寫入對應的 Stripe,而是放到一個稱為 L0 的地方,用戶可以配置 L0 可以放置 HFile 的數量。一旦 L0 放置的文件數超過設定值,系統就會將這些 HFile 寫入對應的 Stripe:首先讀出 HFile 的 KVs,再根據每個 KV 的 Key 定位到具體的 Stripe,將該 KV 插入對應 Stripe 的文件中即可,如圖6所示。由于 Stripe 是個小的 Region,所以 Compaction 并不會太多消耗系統資源。另外,讀取數據時,根據對應的 Key 查找到對應的 Stripe,然后在 Stripe 內部執行查找,因為 Stripe 內數據量相對很小,所以一定程度上也可以提升數據查找性能。

圖片

2.4 執行 Compaction 操作

挑選好待合并文件后,就是執行真正的合并。合并流程主要分為以下幾步:

  1. 按順序讀出待合并所有 HFile 文件的 KV,并順序寫到位于./tmp 目錄下的臨時文件中;
  2. 將臨時文件移動到對應 Region 的正式數據目錄中;
  3. 將 Compaction 的輸入文件路徑和輸出文件路徑封裝為 KV 寫入 WAL 日志,并打上 Compaction 標記,最后強制執行 sync;
  4. 將對應 Region 數據目錄下的 Compaction 的輸入文件全部刪除。

HBase對整個 Compaction 的考慮是非常全面的,上述4個步驟的每一步發生錯誤,都具有很強的容錯性和冪等性(執行一次和多次的結果相同)。

  • 如果 RS 在步驟2或步驟2之前發生異常,本次 Compaction 會被認為失敗,如果繼續進行同樣的 Compaction,上次異常對接下來的 Compaction不會有任何影響,也不會對讀寫有影響,唯一的影響就是多了一份冗余的數據;
  • 如果 RS 在步驟2之后、步驟3或步驟3之前發生異常,也僅僅會多一份冗余數據;
  • 如果在步驟3之后、步驟4之前發生異常,則 RS 在重新打開 Region 之后就會從 WAL 中看到上次 Compaction 的日志。因為此時輸入文件和輸出文件已經持久化到 HDFS,因此只需要根據 WAL 日志移除掉 Compaction 的輸入文件即可。

下面附上 Store 的 compact 方法。

public List<StoreFile> compact(CompactionContext compaction,
   CompactionThroughputController throughputController, User user) throws IOException {
   assert compaction != null;
   List<StoreFile> sfs = null;
   CompactionRequest cr = compaction.getRequest();
   try {
     // Do all sanity checking in here if we have a valid CompactionRequest
     // because we need to clean up after it on the way out in a finally
     // block below
     long compactionStartTime = EnvironmentEdgeManager.currentTime();
     assert compaction.hasSelection();
     Collection<StoreFile> filesToCompact = cr.getFiles();
     assert !filesToCompact.isEmpty();
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
       // 再次檢查
       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
     }
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
         + this + " of " + this.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + fs.getTempDir() + ", totalSize="
         + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
     // Commence the compaction.  開始compact,newFiles是合并后的新文件
     List<Path> newFiles = compaction.compact(throughputController, user);
     long outputBytes = 0L;
     // TODO: get rid of this!
     if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
       LOG.warn("hbase.hstore.compaction.complete is set to false");
       sfs = new ArrayList<StoreFile>(newFiles.size());
       final boolean evictOnClose =
           cacheConf != null? cacheConf.shouldEvictOnClose(): true;
       for (Path newFile : newFiles) {
         // Create storefile around what we wrote with a reader on it.
         StoreFile sf = createStoreFileAndReader(newFile);
         sf.closeReader(evictOnClose);
         sfs.add(sf);
       }
       return sfs;
     }
     // Do the steps necessary to complete the compaction.
     // 將newFiles移動到新的位置,返回StoreFile列表
     sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
     // 在WAL中寫入Compaction記錄
     writeCompactionWalRecord(filesToCompact, sfs);
     // 將新生成的StoreFile列表替換到StoreFileManager的storefile中
     replaceStoreFiles(filesToCompact, sfs);
     // 根據compact類型,累加相應計數器
     if (cr.isMajor()) {
       majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
       majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
     } else {
       compactedCellsCount += getCompactionProgress().totalCompactingKVs;
       compactedCellsSize += getCompactionProgress().totalCompactedSize;
     }
     for (StoreFile sf : sfs) {
       outputBytes += sf.getReader().length();
     }
     // At this point the store will use new files for all new scanners.
     // 歸檔舊文件
     completeCompaction(filesToCompact, true); // Archive old files & update store size.
     long now = EnvironmentEdgeManager.currentTime();
     if (region.getRegionServerServices() != null
         && region.getRegionServerServices().getMetrics() != null) {
       region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
         now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
         outputBytes);
     }
     // 記錄日志信息并返回
     logCompactionEndMessage(cr, sfs, now, compactionStartTime);
     return sfs;
   } finally {
     finishCompactionRequest(cr);
   }
 }

三、Compaction 的限流

上述幾種策略都是根據不同的業務場景設置對應的文件選擇策略,核心都是減少參與 Compaction 的文件數,縮短整個 Compaction 執行的時間,間接降低 Compaction 的 IO 放大效應,減少對業務讀寫的延遲影響。但是,如果不對 Compaction 執行階段的讀寫吞吐量進行限制的話也會引起短時間大量系統資源消耗,影響用戶讀寫延遲。HBase 通過限制 Compaction 速度 和 Compaction 的帶寬來對 Compaction 進行限流。

3.1 Limit Compaction Speed

該優化方案通過感知 Compaction 的壓力情況自動調節系統的 Compaction 吞吐量,在壓力大的時候降低合并吞吐量,壓力小的時候增加合并吞吐量。

基本原理為:

在正常情況下,用戶需要設置吞吐量下限參數 hbase.hstore.compaction.throughput.lower.bound (默認10MB/sec) 和上限參數 hbase.hstore.compaction.throughput.higher.bound (默認20MB/sec),實際會工作時吞吐量為 lower + (higer – lower) * ratio,其中 ratio 是一個取值范圍在0到1的小數,它由當前 Store 中待參與 Compation 的 HFile 數量決定,數量越多,ratio 越小,反之越大。

如果當前 Store中 HFile 的數量太多,并且超過了參數 blockingFileCount,此時所有寫請求就會阻塞等待 Compaction 完成,這種場景下上述限制會自動失效。

3.2 Compaction BandWidth Limit

原理其實和 Limit Compaction Speed 思路基本一致,它主要涉及兩個參數:compactBwLimit 和 numOfFilesDisableCompactLimit。

作用分別如下:

  • compactBwLimit:一次 Compaction 的最大帶寬使用量,如果 Compaction 所使用的帶寬高于該值,就會強制令其 sleep 一段時間。
  • numOfFilesDisableCompactLimit:很顯然,在寫請求非常大的情況下,限制 Compaction 帶寬的使用量必然會導致 HFile 堆積,進而會影響到讀請求響應延時。因此該值意義就很明顯,一旦 Store 中 HFile 數量超過該設定值,帶寬限制就會失效。
// 該方法進行Compaction的動態限制
private void tune(double compactionPressure) {
    double maxThroughputToSet;
    // 壓力大于1,最大限速不受限制
    if (compactionPressure > 1.0) {
      // set to unlimited if some stores already reach the blocking store file count
      maxThroughputToSet = Double.MAX_VALUE;
     // 空閑時間,最大限速為設置的Compaction最大吞吐量
    } else if (offPeakHours.isOffPeakHour()) {
      maxThroughputToSet = maxThroughputOffpeak;
    } else {
      // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
      // calculate the throughput limitation.
      // lower + (higher - lower) * ratio
      maxThroughputToSet =
          maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
              * compactionPressure;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
          + throughputDesc(maxThroughputToSet));
    }
    this.maxThroughput = maxThroughputToSet;
  }

再來看下獲取 R S的 Compaction 壓力的 getCompactionPressure 方法,其實就是遍歷每個 Region 的每個 Store,取壓力最大的。

@Override
public double getCompactionPressure() {
  double max = 0;
  for (Region region : onlineRegions.values()) {
    for (Store store : region.getStores()) {
      double normCount = store.getCompactionPressure();
      if (normCount > max) {
        max = normCount;
      }
    }
  }
  return max;
}
@Override
public double getCompactionPressure() {
  int storefileCount = getStorefileCount();
  int minFilesToCompact = comConf.getMinFilesToCompact();
  if (storefileCount <= minFilesToCompact) {
    return 0.0;
  }
  return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
}

HBase 的限流方案通過感知 Compaction 的壓力情況自動調節系統的 Compaction 吞吐量,在壓力大的時候降低合并吞吐量,壓力小的時候增加合并吞吐量。

基本原理為:

在正常情況下,用戶需要設置吞吐量下限參數 hbase.hstore.compaction.throughput.lower.bound (默認10MB/sec)   和上限參數 hbase.hstore.compaction.throughput.higher.bound(默認20MB/sec),而實際會工作在吞吐量為 lower + (higer – lower) * ratio的情況下,其中 ratio 是一個取值范圍在0到1的小數,它由當前 Store 中待參與 Compation 的 HFile 數量決定,數量越多,ratio 越小,反之越大。

如果當前 Store中 HFile 的數量太多,并且超過了 blockingFileCount 的值,該值由參數 hbase.hstore.blockingStoreFiles 配置,此時所有寫請求就會阻塞等待 Compaction 完成,這種場景下,上述限制會自動失效。

四、線上遇到的問題及調優方法

由于線上環境的復雜性,對 Compaction 模塊做了較多的優化,下面選取兩個典型案例進行說明。

4.1 關閉了自動觸發 Major Compaction,但是監控中 Major Compaction 隊列仍然有值進而影響讀寫性能

線上集群都是關閉自動觸發 Major Compaction 的功能,在業務低峰期由定時任務手動觸發 Major Compaction。在某次故障中,業務反饋讀寫性能在非執行 Major Compaction 的時段延遲比較大。查看監控發現,監控中的 Major Compaction 隊列的值比較大。

下面是當時的 Major Compaction 隊列長度和讀寫調用平均耗時的監控圖,從圖中可以很明顯地看出下面幾點:

  • Major Compaction 的隊列長度比較大的時候,讀寫的耗時也比較大;
  • Major Compaction 的隊列長度跟入流量有關系,入流量比較大的時候,Major Compaction 的隊列長度就比較大。

這里就產生了疑問,關閉了自動 Major Compaction,是什么條件觸發了 Major Compaction ?

圖片


圖片


帶著上面的疑問,我們從源碼的層面對問題進行分析。

1)首先查看了 Major Compaction 隊列長度這個指標的含義,該指標表示 longCompaction 線程池的工作隊列中等待的個數。

@Override
public int getLargeCompactionQueueSize() {
  //The thread could be zero.  if so assume there is no queue.
  if (this.regionServer.compactSplitThread == null) {
    return 0;
  }
  return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
}
public int getLargeCompactionQueueSize() {
  return longCompactions.getQueue().size();
}

2)查看 HBase 日志,發現確實有做 Major Compaction 的行為。

圖片

3)進一步排查什么時候會去調用 long Compaction 的線程池,查看 Compaction 選擇 long Compaction 和 small Compaction 隊列相關的源碼。

/**
 * @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
 * @return subset copy of candidate list that meets compaction criteria
 * @throws java.io.IOException
 */
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
    final List<StoreFile> filesCompacting, final boolean isUserCompaction,
    final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
  // Preliminary compaction subject to filters
  ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
  // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
  // able to compact more if stuck and compacting, because ratio policy excludes some
  // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
  int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
  boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
      >= storeConfigInfo.getBlockingFileCount();
  candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
  LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
      filesCompacting.size() + " compacting, " + candidateSelection.size() +
      " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
 
  // If we can't have all files, we cannot do major anyway
  boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
  if (!(forceMajor && isAllFiles)) {
    // 過濾掉大文件
    candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
    isAllFiles = candidateFiles.size() == candidateSelection.size();
  }
  ...
}

其中 skipLargeFiles 方法對待合并文件進行過濾,去掉大文件,該閾值是由 

maxCompactSize =

conf.getLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, Long.MAX_VALUE)配置,默認是 Long.MAX_VALUE。

/**
 * @param candidates pre-filtrate
 * @return filtered subset
 * exclude all files above maxCompactSize
 * Also save all references. We MUST compact them
 */
private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
  boolean mayUseOffpeak) {
  int pos = 0;
  while (pos < candidates.size() && !candidates.get(pos).isReference()
    && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
    ++pos;
  }
  if (pos > 0) {
    LOG.debug("Some files are too large. Excluding " + pos
        + " files from compaction candidates");
    candidates.subList(0, pos).clear();
  }
  return candidates;
}

之后再通過待合并文件的大小來選擇 long Compaction 線程池還是 small Compaction 的線程池。

@Override
public boolean throttleCompaction(long compactionSize) {
  return compactionSize > comConf.getThrottlePoint();
}

這個閾值的計算方法如下,默認是2.5G,就是說如果待合并的文件大小大于2.5G,就會放到 long Compaction 的線程池中去執行。

throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());

4)查看 ReigonServer 該時間段的日志,發現有大量大于 2.5G 的文件在 Compaction,這就解釋了為什么RS日志中該時間段并沒有做 Major Compaction 的日志但是 long Compaction 隊列有值的問題。

圖片

至此,問題原因就找到了,入流量的增加導致單個 HFile 文件比較大,Flush 之后做 Minor Compaction 的時候如果待合并文件總大小大于2.5G(默認值)的時候,會將此次 Minor Compaction 放入到 long Compaction 的線程池中執行。待合并的文件比較大導致磁盤 IO 消耗比較高,進而影響到讀寫性能。

5)措施

我們調整了 Compaction 的參數 

hbase.hstore.compaction.max.size 將該值修改為2G,表示在 Minor Compaction 的時候大于 2G 的 HFile 將會被排除,等到業務低峰期的時候再對大于2G的文件合并,減少 Compaction 對磁盤 IO 的影響。

6)效果

調整之后,在非手動觸發 Major Compaction 期間就很少有占用 long Compaction 線程池的情況出現了,讀寫平均耗時也降到了50ms以下。

圖片


圖片

4.2 定時手動觸發的 Major Compation 任務執行時間過長

業務反饋某張表的讀寫性能最近有點慢,通過監控查看到該表的存儲一直在增長,存儲單副本達到了578TB。查看表信息,該表的TTL設置的15天,該表的輸入流量也沒有明顯的增加。監控圖如下:

圖片


圖片

于是懷疑每天的 Compaction 任務沒有做完,導致過期數據未能完全刪除。查看線上配置,Major Compaction 的線程池大小是1,該表的數據量又比較大。于是調整了 Compaction 線程池的大小為10,并且設置了集群的空閑時間 hbase.offpeak.start.hour 與 hbase.offpeak

.end.hour,在這個時間段內 Compaction 的時候可以增加待合并文件大小。調整完成后,通過監控查看 Compaction 的效果對比圖,可以看到 Compaction 的工作量明顯增大了。

圖片

查看該表所占存儲的大小,可以看到該表已經從 578T 下降到了 349T,下降幅度達到了40%。業務的讀寫耗時也恢復正常。Compaction 的參數比較重要, 在調整的時候需要考慮對業務是否有影響,調整之后要多觀察業務的耗時情況,可以循序漸進的對參數進行調整。

五、Compaction相關參數介紹

下面附上 Compaction 相關的參數,線上環境可以根據實際情況進行調整。

圖片

六、總結

Compaction 是 HBase 提升讀寫性能非常重要的手段,而 Compaction 的邏輯又比較復雜,并且使用不當,會導致寫放大,進而會影響到正常的讀寫請求。本文重點介紹了 Compaction 的觸發機制、Compaction 發展過程中出現的多種合并策略、待合并文件的選擇算法、 Compaction 的限流以及 Compaction 相關的參數做了詳細的描述,最后選擇線上的2個案例,介紹了具體的分析思路和調優的方法,經調優后,性能得到了成倍的提升,保障了業務高效、穩定的運行。

責任編輯:龐桂玉 來源: vivo互聯網技術
相關推薦

2021-07-12 09:17:54

Memory Comp系統內存

2021-11-21 23:03:38

jvm調優虛擬機

2020-05-22 09:12:46

HTTP3網絡協議

2025-02-08 08:10:00

2024-05-10 11:35:22

Redis延時隊列數據庫

2025-02-06 08:24:25

AQS開發Java

2009-06-08 16:52:00

2017-04-17 15:48:15

Cinder備份實踐

2023-02-26 11:50:04

Hbase程序Oracle

2017-06-16 09:39:32

優酷實踐阿里云

2017-12-20 15:10:09

HBaseHadoop數據

2021-12-20 00:03:38

Webpack運行機制

2020-09-03 14:30:40

Tomcat 拆解調優

2023-02-22 07:04:05

自動機原理優化實踐

2009-07-24 13:54:39

MVVM模式

2023-10-29 16:26:27

Python動查重

2017-05-04 16:35:45

2011-07-08 16:02:54

HBase

2010-03-30 14:16:32

無線上網卡原理

2021-11-07 23:49:19

SQL數據庫工具
點贊
收藏

51CTO技術棧公眾號

久久久久av| 666av成人影院在线观看| 激情五月激情综合网| 精品精品国产国产自在线| 天天操天天爱天天爽| av电影在线观看| 国产一区二区按摩在线观看| 欧美另类在线观看| 91黄色免费视频| 韩国三级一区| 亚洲欧洲精品一区二区三区 | 欧美日韩精品一区视频| 一区二区三区四区国产| 国产成人精品毛片| 99精品免费| 一本色道久久综合狠狠躁篇怎么玩 | 99热国产在线| www.性欧美| 国产精品 欧美在线| 日本免费网站视频| 精品一区二区男人吃奶| 日韩欧美亚洲一二三区| 色噜噜狠狠一区二区三区| 国产精品人妻一区二区三区| 亚洲精品1区| 精品视频在线导航| gai在线观看免费高清| 成人在线免费观看黄色| 国产精品免费视频观看| 国产精品一区二区三区在线| 日本三级一区二区三区| 欧美精品午夜| 在线看日韩欧美| 99久久人妻精品免费二区| 四虎精品一区二区免费| 亚洲国产视频一区二区| 日韩尤物视频| 少妇人妻一区二区| 黄色小说综合网站| 国产成人精品久久二区二区91 | 欧美日韩在线一| 蜜桃视频网站在线| 91片在线免费观看| 91免费欧美精品| 最近免费中文字幕大全免费版视频| 五月精品视频| 中文字幕成人精品久久不卡| 熟女人妻在线视频| 99久久免费精品国产72精品九九| 欧美三级电影在线观看| 免费黄色福利视频| a级片在线免费| 亚洲男人天堂一区| 亚洲精品一区二| 牛牛澡牛牛爽一区二区| av激情综合网| 国产福利久久精品| 成人av手机在线| 国产精品一区二区黑丝| 国产精品久久久久一区二区 | 精品免费在线观看| 国产视频在线观看网站| 嫩草香蕉在线91一二三区| 久久精品欧美日韩| 久久综合给合久久狠狠色| 风流少妇一区二区三区91| 国产成人av电影在线观看| 成人免费福利在线| 国产视频在线观看免费| 精品系列免费在线观看| 国产美女久久精品香蕉69| 一区二区乱子伦在线播放| 日本特黄久久久高潮| 国产成人精品久久亚洲高清不卡| 国产成人在线观看网站| 99国产成+人+综合+亚洲欧美| 欧美激情中文字幕在线| 国产一级特黄视频| 99精品免费网| 日本不卡视频在线播放| 五月婷婷六月婷婷| 欧美aaaaa成人免费观看视频| 欧美亚洲午夜视频在线观看| 好吊色在线视频| 奇米色777欧美一区二区| 国产精品视频999| 中文字幕在线观看国产| 精品一区二区三区在线播放视频| 成人精品一区二区三区电影免费 | 日本一区二区在线看| 亚洲欧洲美洲在线综合| 能直接看的av| 91亚洲一区| 欧美精品中文字幕一区| 国产无遮挡又黄又爽在线观看| 在线免费高清一区二区三区| 青草青草久热精品视频在线网站| 无码人妻一区二区三区线| 蜜桃av一区二区在线观看| 91精品视频在线看| 好吊视频一二三区| 国产视频一区二区三区在线观看| 亚洲人体一区| 国产蜜臀av在线播放| 日韩欧美精品中文字幕| 日韩一级免费片| 伊人久久大香| 日韩电影在线观看中文字幕 | av免费播放网站| 婷婷伊人综合| 久久久爽爽爽美女图片| 久久影视中文字幕| 国产精品69毛片高清亚洲| 久草热久草热线频97精品| 成人77777| 亚洲蜜桃精久久久久久久| 欧美久久久久久久久久久久久| 日韩在线免费| 日韩精品一区二区三区蜜臀 | 亚洲色成人www永久网站| 九九久久精品视频| 麻豆成人av| 在线电影福利片| 一本大道久久a久久综合婷婷| 亚洲一二区在线观看| 亚洲电影男人天堂| 色综合久久悠悠| 中文在线免费看视频| 国产69精品久久久久毛片 | 红桃成人av在线播放| 欧美精品免费在线观看| 国产精品久久久久久久久夜色| 丰满岳乱妇一区二区三区| 亚洲精品在线免费看| 极品av在线| 666欧美在线视频| 日本二区在线观看| 亚洲人体偷拍| 99久久精品免费看国产四区 | 国产精品66部| 亚洲视频电影| 成人免费av电影| 精品调教chinesegay| 青青草手机在线视频| 蜜臀av性久久久久蜜臀aⅴ | 精品国产乱码久久久久久蜜臀网站| 久久无码av三级| 国产精彩视频一区二区| 麻豆精品国产| 久久久91精品国产| 曰批又黄又爽免费视频| 国产婷婷精品av在线| 国产欧美亚洲日本| 亚洲精品白浆| 日韩精品一区二区三区在线播放| 激情视频在线播放| 成人美女视频在线看| 男女超爽视频免费播放| 欧美变态网站| 热re99久久精品国产66热| 你懂的在线播放| 在线影院国内精品| avhd101老司机| 极品少妇xxxx精品少妇| a级网站在线观看| 婷婷视频一区二区三区| 久久久久久国产| 午夜在线观看视频18| 欧美性色19p| 91成人精品一区二区| 久久国产精品99久久人人澡| 国产又爽又黄ai换脸| 精品一区二区三区免费看| 欧美激情视频给我| 四虎在线视频| 欧美写真视频网站| 顶臀精品视频www| 成人午夜短视频| 韩国一区二区av| 婷婷另类小说| 精品伦精品一区二区三区视频| 美女福利一区二区| 久久精品小视频| 六月婷婷综合网| 日本道在线观看一区二区| 国产人妻精品一区二区三区不卡| 丰满少妇久久久久久久| 男人操女人逼免费视频| 欧美日韩国产在线观看网站| 91在线短视频| 不卡av影片| 久久综合色88| 色视频在线观看福利| 欧美精品亚洲二区| 欧美三日本三级少妇99| 国产精品日产欧美久久久久| 男人女人拔萝卜视频| 美女久久一区| 超碰97在线看| 国产精品一区二区av交换| 96pao国产成视频永久免费| 天堂中文av在线资源库| 欧美久久久精品| 六十路在线观看| 日韩欧美123| 日本免费精品视频| 亚洲成在线观看| 男人晚上看的视频| 久久精品日产第一区二区三区高清版| 久久精品无码一区二区三区毛片 | 一道本视频在线观看| 伊人天天综合| 国产精品久久成人免费观看| 国产日韩欧美一区二区三区| 成人蜜桃视频| 亚州欧美在线| 日韩av免费在线| 高h视频在线播放| 久久精品久久久久久| 你懂的在线看| 日韩精品在线视频美女| 亚洲第一页视频| 欧美日本韩国一区| 欧美 亚洲 另类 激情 另类 | 69sex久久精品国产麻豆| 久久高清免费| 色播亚洲视频在线观看| 青青草久久爱| 含羞草久久爱69一区| 亚洲国产中文在线二区三区免| 91精品美女在线| 国产精品高潮久久| 国产精品久久久久久久久久免费 | 国产精品国产三级国产aⅴ浪潮| 超碰在线97国产| 欧美激情一区二区三级高清视频| 黄色成人在线| 日韩一级黄色av| 在线观看精品一区二区三区| 亚洲日本中文字幕| 你懂的视频在线播放| 亚洲精品美女在线观看播放| 丰满大乳国产精品| 亚洲国产天堂久久综合网| 国模无码一区二区三区| 亚洲精品在线免费观看视频| 亚洲精品97久久中文字幕| 日韩视频在线永久播放| 99热这里是精品| 日韩免费一区二区三区在线播放| 国产av精国产传媒| 精品国产123| 人妻91麻豆一区二区三区| 亚洲а∨天堂久久精品喷水 | 国产高清不卡一区二区| 精品国产乱码久久久久夜深人妻| 国产精品系列在线播放| 欧美午夜精品一区二区| 99久久免费精品高清特色大片| 亚洲蜜桃精久久久久久久久久久久| 91亚洲精品久久久蜜桃| 欧美图片第一页| 国产人成一区二区三区影院| 国产精品18在线| 一区二区三区资源| 国产一卡二卡在线| 一本一本大道香蕉久在线精品 | a视频在线观看免费| 免费av在线一区| 成人性生交大片免费看网站| 97香蕉久久超级碰碰高清版| 久久久久久久| 国产女人精品视频| 欧美专区一区| 久久久久久国产精品免费免费| 国产成人一区二区三区影院| 一区二区在线中文字幕电影视频 | 337p粉嫩大胆噜噜噜鲁| 视频在线观看国产精品| 中文字幕永久有效| 高清不卡在线观看| 偷拍夫妻性生活| 综合中文字幕亚洲| 色播视频在线播放| 欧美日韩一区二区在线观看| 精品国产黄色片| 亚洲人精选亚洲人成在线| 黄色网在线看| 欧美在线视频免费观看| 日韩综合久久| 精品国产aⅴ麻豆| 久久人体视频| 国产美女主播在线| 日本欧美一区二区在线观看| 国产精品嫩草影视| 久久精品男人的天堂| 久操视频免费在线观看| 在线亚洲精品福利网址导航| 国产91绿帽单男绿奴| 中文字幕亚洲欧美一区二区三区| 国产精品—色呦呦| 国产精品免费久久久| 极品束缚调教一区二区网站| 一区二区欧美日韩| 亚洲欧美日韩国产一区二区| 亚洲欧美天堂在线| 国产清纯白嫩初高生在线观看91| 久久久久性色av无码一区二区| 91福利区一区二区三区| 欧美一区二区三区激情| 久久精品国亚洲| av一区在线播放| 久久青青草原一区二区| 国产一区日韩一区| 午夜宅男在线视频| 久久精品日产第一区二区三区高清版 | 亚洲人成影院在线观看| 日韩精品一区不卡| 亚洲加勒比久久88色综合| www在线观看播放免费视频日本| 国产成人在线视频| 羞羞色国产精品网站| 日韩欧美精品免费| 国产成人精品一区二| 蜜桃av.com| 欧美在线一区二区三区| 青青草手机在线| 久久免费观看视频| 51精品国产| 成人免费观看在线| 国产成人自拍高清视频在线免费播放| 日韩不卡av在线| 欧美午夜精品久久久久久超碰| 深夜福利视频在线免费观看| 久久久久久网站| jizz性欧美23| 日本黄色片一级片| 国产成人久久精品77777最新版本| 懂色av蜜臀av粉嫩av永久| 欧美影院一区二区三区| 理论视频在线| 国产精品成人一区| 日本久久精品| 日韩不卡一二三| 欧美国产精品中文字幕| 国产美女www| 在线播放日韩精品| 欧美综合影院| 中文网丁香综合网| 国产一区二区三区在线观看免费视频| 国产精品视频看看| 91精品免费观看| 午夜av在线播放| 动漫3d精品一区二区三区| 亚洲国产第一| aaaaaav| 欧美日韩亚洲激情| 国产免费av高清在线| 国产精品视频最多的网站| 99久久亚洲精品蜜臀| а 天堂 在线| 亚洲在线免费播放| 凸凹人妻人人澡人人添| 日本在线精品视频| 成人综合一区| 熟妇无码乱子成人精品| 午夜国产精品一区| 欧美在线观看在线观看| 国产精品美女www爽爽爽视频| 99视频精品视频高清免费| 中文字幕 欧美 日韩| 精品久久久久久久久中文字幕| 久色视频在线| 91久久国产综合久久91精品网站| 中文在线日韩| 中文字幕xxx| 欧美精选午夜久久久乱码6080| 91在线中文| 久久免费99精品久久久久久| 美女视频黄 久久| 久久视频免费看| 亚洲欧洲偷拍精品| 国产成人免费av一区二区午夜 | 久久亚洲高清| 另类欧美日韩国产在线| 国产精品18p| 中文字幕精品av| 国产成人在线中文字幕| 欧在线一二三四区| 亚洲视频每日更新| 瑟瑟在线观看| 91久久国产婷婷一区二区| 午夜亚洲性色福利视频| 五月天av网站| 亚洲欧洲在线观看| 91精品啪在线观看国产手机| 日本熟妇人妻xxxxx| 亚洲午夜免费视频| 在线观看av黄网站永久| 好吊色欧美一区二区三区四区 |