精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink的八種分區策略源碼解讀

開源
Flink包含8中分區策略,這8中分區策略(分區器)分別如下面所示,本文將從源碼的角度一一解讀每個分區器的實現方式。

[[399426]]

本文轉載自微信公眾號「大數據技術與數倉」,作者西貝。轉載本文請聯系大數據技術與數倉公眾號。

 Flink包含8中分區策略,這8中分區策略(分區器)分別如下面所示,本文將從源碼的角度一一解讀每個分區器的實現方式。

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

繼承關系圖

接口

名稱

ChannelSelector

實現

  1. public interface ChannelSelector<T extends IOReadableWritable> { 
  2.  
  3.     /** 
  4.      * 初始化channels數量,channel可以理解為下游Operator的某個實例(并行算子的某個subtask). 
  5.      */ 
  6.     void setup(int numberOfChannels); 
  7.  
  8.     /** 
  9.      *根據當前的record以及Channel總數, 
  10.      *決定應將record發送到下游哪個Channel。 
  11.      *不同的分區策略會實現不同的該方法。 
  12.      */ 
  13.     int selectChannel(T record); 
  14.  
  15.     /** 
  16.     *是否以廣播的形式發送到下游所有的算子實例 
  17.      */ 
  18.     boolean isBroadcast(); 

抽象類

名稱

StreamPartitioner

實現

  1. public abstract class StreamPartitioner<T> implements 
  2.         ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable { 
  3.     private static final long serialVersionUID = 1L; 
  4.  
  5.     protected int numberOfChannels; 
  6.  
  7.     @Override 
  8.     public void setup(int numberOfChannels) { 
  9.         this.numberOfChannels = numberOfChannels; 
  10.     } 
  11.  
  12.     @Override 
  13.     public boolean isBroadcast() { 
  14.         return false
  15.     } 
  16.  
  17.     public abstract StreamPartitioner<T> copy(); 

繼承關系圖

GlobalPartitioner

簡介

該分區器會將所有的數據都發送到下游的某個算子實例(subtask id = 0)

源碼解讀

  1. /** 
  2.  * 發送所有的數據到下游算子的第一個task(ID = 0) 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class GlobalPartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     @Override 
  10.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  11.         //只返回0,即只發送給下游算子的第一個task 
  12.         return 0; 
  13.     } 
  14.  
  15.     @Override 
  16.     public StreamPartitioner<T> copy() { 
  17.         return this; 
  18.     } 
  19.  
  20.     @Override 
  21.     public String toString() { 
  22.         return "GLOBAL"
  23.     } 

圖解

ShufflePartitioner

簡介

隨機選擇一個下游算子實例進行發送

源碼解讀

  1. /** 
  2.  * 隨機的選擇一個channel進行發送 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class ShufflePartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     private Random random = new Random(); 
  10.  
  11.     @Override 
  12.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  13.         //產生[0,numberOfChannels)偽隨機數,隨機發送到下游的某個task 
  14.         return random.nextInt(numberOfChannels); 
  15.     } 
  16.  
  17.     @Override 
  18.     public StreamPartitioner<T> copy() { 
  19.         return new ShufflePartitioner<T>(); 
  20.     } 
  21.  
  22.     @Override 
  23.     public String toString() { 
  24.         return "SHUFFLE"
  25.     } 

圖解

BroadcastPartitioner

簡介

發送到下游所有的算子實例

源碼解讀

  1. /** 
  2.  * 發送到所有的channel 
  3.  */ 
  4. @Internal 
  5. public class BroadcastPartitioner<T> extends StreamPartitioner<T> { 
  6.     private static final long serialVersionUID = 1L; 
  7.     /** 
  8.      * Broadcast模式是直接發送到下游的所有task,所以不需要通過下面的方法選擇發送的通道 
  9.      */ 
  10.     @Override 
  11.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  12.         throw new UnsupportedOperationException("Broadcast partitioner does not support select channels."); 
  13.     } 
  14.  
  15.     @Override 
  16.     public boolean isBroadcast() { 
  17.         return true
  18.     } 
  19.  
  20.     @Override 
  21.     public StreamPartitioner<T> copy() { 
  22.         return this; 
  23.     } 
  24.  
  25.     @Override 
  26.     public String toString() { 
  27.         return "BROADCAST"
  28.     } 

圖解

RebalancePartitioner

簡介

通過循環的方式依次發送到下游的task

源碼解讀

  1. /** 
  2.  *通過循環的方式依次發送到下游的task 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class RebalancePartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     private int nextChannelToSendTo; 
  10.  
  11.     @Override 
  12.     public void setup(int numberOfChannels) { 
  13.         super.setup(numberOfChannels); 
  14.         //初始化channel的id,返回[0,numberOfChannels)的偽隨機數 
  15.         nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels); 
  16.     } 
  17.  
  18.     @Override 
  19.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  20.         //循環依次發送到下游的task,比如:nextChannelToSendTo初始值為0,numberOfChannels(下游算子的實例個數,并行度)值為2 
  21.         //則第一次發送到ID = 1的task,第二次發送到ID = 0的task,第三次發送到ID = 1的task上...依次類推 
  22.         nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; 
  23.         return nextChannelToSendTo; 
  24.     } 
  25.  
  26.     public StreamPartitioner<T> copy() { 
  27.         return this; 
  28.     } 
  29.  
  30.     @Override 
  31.     public String toString() { 
  32.         return "REBALANCE"
  33.     } 

圖解

RescalePartitioner

簡介

基于上下游Operator的并行度,將記錄以循環的方式輸出到下游Operator的每個實例。

舉例: 上游并行度是2,下游是4,則上游一個并行度以循環的方式將記錄輸出到下游的兩個并行度上;上游另一個并行度以循環的方式將記錄輸出到下游另兩個并行度上。

若上游并行度是4,下游并行度是2,則上游兩個并行度將記錄輸出到下游一個并行度上;上游另兩個并行度將記錄輸出到下游另一個并行度上。

源碼解讀

  1. @Internal 
  2. public class RescalePartitioner<T> extends StreamPartitioner<T> { 
  3.     private static final long serialVersionUID = 1L; 
  4.  
  5.     private int nextChannelToSendTo = -1; 
  6.  
  7.     @Override 
  8.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  9.         if (++nextChannelToSendTo >= numberOfChannels) { 
  10.             nextChannelToSendTo = 0; 
  11.         } 
  12.         return nextChannelToSendTo; 
  13.     } 
  14.  
  15.     public StreamPartitioner<T> copy() { 
  16.         return this; 
  17.     } 
  18.  
  19.     @Override 
  20.     public String toString() { 
  21.         return "RESCALE"
  22.     } 

圖解

尖叫提示

Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。

StreamGraph:是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。

JobGraph:StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。

ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調度層最核心的數據結構。

物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,并不是一個具體的數據結構。

而StreamingJobGraphGenerator就是StreamGraph轉換為JobGraph。在這個類中,把ForwardPartitioner和RescalePartitioner列為POINTWISE分配模式,其他的為ALL_TO_ALL分配模式。代碼如下:

  1. if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { 
  2.             jobEdge = downStreamVertex.connectNewDataSetAsInput( 
  3.                 headVertex, 
  4.  
  5.                // 上游算子(生產端)的實例(subtask)連接下游算子(消費端)的一個或者多個實例(subtask) 
  6.                 DistributionPattern.POINTWISE, 
  7.                 resultPartitionType); 
  8.         } else { 
  9.             jobEdge = downStreamVertex.connectNewDataSetAsInput( 
  10.                 headVertex, 
  11.                 // 上游算子(生產端)的實例(subtask)連接下游算子(消費端)的所有實例(subtask) 
  12.                 DistributionPattern.ALL_TO_ALL, 
  13.                 resultPartitionType); 
  14.         } 

ForwardPartitioner

簡介

發送到下游對應的第一個task,保證上下游算子并行度一致,即上有算子與下游算子是1:1的關系

源碼解讀

  1. /** 
  2.  * 發送到下游對應的第一個task 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class ForwardPartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     @Override 
  10.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  11.         return 0; 
  12.     } 
  13.  
  14.     public StreamPartitioner<T> copy() { 
  15.         return this; 
  16.     } 
  17.  
  18.     @Override 
  19.     public String toString() { 
  20.         return "FORWARD"
  21.     } 

圖解

尖叫提示

在上下游的算子沒有指定分區器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner,對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常

  1. //在上下游的算子沒有指定分區器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner 
  2.             if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { 
  3.                 partitioner = new ForwardPartitioner<Object>(); 
  4.             } else if (partitioner == null) { 
  5.                 partitioner = new RebalancePartitioner<Object>(); 
  6.             } 
  7.  
  8.             if (partitioner instanceof ForwardPartitioner) { 
  9.                 //如果上下游的并行度不一致,會拋出異常 
  10.                 if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { 
  11.                     throw new UnsupportedOperationException("Forward partitioning does not allow " + 
  12.                         "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + 
  13.                         ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + 
  14.                         " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); 
  15.                 } 
  16.             } 

KeyGroupStreamPartitioner

簡介

根據key的分組索引選擇發送到相對應的下游subtask

源碼解讀

  1. /** 
  2.  * 根據key的分組索引選擇發送到相對應的下游subtask 
  3.  * @param <T> 
  4.  * @param <K> 
  5.  */ 
  6. @Internal 
  7. public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner { 
  8. ... 
  9.  
  10.     @Override 
  11.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  12.         K key
  13.         try { 
  14.             key = keySelector.getKey(record.getInstance().getValue()); 
  15.         } catch (Exception e) { 
  16.             throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); 
  17.         } 
  18.         //調用KeyGroupRangeAssignment類的assignKeyToParallelOperator方法,代碼如下所示 
  19.         return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); 
  20.     } 
  21. ... 
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
  1. public final class KeyGroupRangeAssignment { 
  2. ... 
  3.  
  4.     /** 
  5.      * 根據key分配一個并行算子實例的索引,該索引即為該key要發送的下游算子實例的路由信息, 
  6.      * 即該key發送到哪一個task 
  7.      */ 
  8.     public static int assignKeyToParallelOperator(Object keyint maxParallelism, int parallelism) { 
  9.         Preconditions.checkNotNull(key"Assigned key must not be null!"); 
  10.         return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); 
  11.     } 
  12.  
  13.     /** 
  14.      *根據key分配一個分組id(keyGroupId) 
  15.      */ 
  16.     public static int assignToKeyGroup(Object keyint maxParallelism) { 
  17.         Preconditions.checkNotNull(key"Assigned key must not be null!"); 
  18.         //獲取key的hashcode 
  19.         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); 
  20.     } 
  21.  
  22.     /** 
  23.      * 根據key分配一個分組id(keyGroupId), 
  24.      */ 
  25.     public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { 
  26.  
  27.         //與maxParallelism取余,獲取keyGroupId 
  28.         return MathUtils.murmurHash(keyHash) % maxParallelism; 
  29.     } 
  30.  
  31.     //計算分區index,即該key group應該發送到下游的哪一個算子實例 
  32.     public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { 
  33.         return keyGroupId * parallelism / maxParallelism; 
  34.     } 
  35. ... 

圖解

CustomPartitionerWrapper

簡介

通過Partitioner實例的partition方法(自定義的)將記錄輸出到下游。

  1. public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> { 
  2.     private static final long serialVersionUID = 1L; 
  3.  
  4.     Partitioner<K> partitioner; 
  5.     KeySelector<T, K> keySelector; 
  6.  
  7.     public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { 
  8.         this.partitioner = partitioner; 
  9.         this.keySelector = keySelector; 
  10.     } 
  11.  
  12.     @Override 
  13.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  14.         K key
  15.         try { 
  16.             key = keySelector.getKey(record.getInstance().getValue()); 
  17.         } catch (Exception e) { 
  18.             throw new RuntimeException("Could not extract key from " + record.getInstance(), e); 
  19.         } 
  20. //實現Partitioner接口,重寫partition方法 
  21.         return partitioner.partition(key, numberOfChannels); 
  22.     } 
  23.  
  24.     @Override 
  25.     public StreamPartitioner<T> copy() { 
  26.         return this; 
  27.     } 
  28.  
  29.     @Override 
  30.     public String toString() { 
  31.         return "CUSTOM"
  32.     } 

比如:

  1. public class CustomPartitioner implements Partitioner<String> { 
  2.       // key: 根據key的值來分區 
  3.       // numPartitions: 下游算子并行度 
  4.       @Override 
  5.       public int partition(String keyint numPartitions) { 
  6.          return key.length() % numPartitions;//在此處定義分區策略 
  7.       } 
  8.   } 

小結

本文主要從源碼層面對Flink的8中分區策略進行了一一分析,并對每一種分區策略給出了相對應的圖示,方便快速理解源碼。如果你覺得本文對你有用,可以關注我,了解更多精彩內容。

 

責任編輯:武曉燕 來源: 大數據技術與數倉
相關推薦

2023-03-10 15:31:45

2024-02-27 08:05:32

Flink分區機制數據傳輸

2020-07-08 12:05:55

Java線程池策略

2010-05-10 16:20:32

負載均衡策略

2009-09-25 14:20:28

Hibernate繼承映射

2011-06-09 13:48:48

程序員

2023-11-20 13:52:00

Redis數據庫

2025-08-04 02:21:00

2024-09-06 09:37:45

WebApp類加載器Web 應用

2025-07-18 11:26:38

2010-06-28 09:19:07

微軟開源

2009-10-23 14:34:00

光纖接入技術

2024-09-04 09:18:03

分區策略

2010-10-11 10:31:51

MySQL分區

2016-07-05 14:09:02

AndroidJAVA內存

2021-08-02 10:46:02

云計算用途

2010-08-24 09:49:44

2010-09-09 08:39:30

2023-03-30 09:06:20

HiveSpark大數據

2015-06-15 10:32:44

Java核心源碼解讀
點贊
收藏

51CTO技術棧公眾號

国产一级特黄毛片| 伊人av成人| 中文字幕免费观看| 日韩久久精品| 精品国产一区二区三区av性色| 成人午夜视频在线观看免费| 国产中文字幕在线| 国模一区二区三区白浆| 午夜欧美不卡精品aaaaa| 国产毛片久久久久久久| 日韩一区网站| 色偷偷久久一区二区三区| 婷婷视频在线播放| 日本v片在线免费观看| 国产一二精品视频| 日本91av在线播放| 久久久久久久久久久网| 成人影视亚洲图片在线| 亚洲精品一区二区三区精华液| 91日韩视频在线观看| 国产丝袜在线观看视频| 国产精品看片你懂得| 精品国产一区二区三区久久久久久 | 日韩久久精品视频| 天天综合久久| 亚洲欧洲一区二区三区久久| 丰满少妇一区二区三区专区| 草民电影神马电影一区二区| 婷婷国产v国产偷v亚洲高清| mm131午夜| 成年在线观看免费人视频| 成人激情免费网站| 成人免费午夜电影| 日韩欧美一级大片| 久久都是精品| 欧美精品video| 久久久久久视频| 成人精品视频| 亚洲香蕉av在线一区二区三区| 极品白嫩少妇无套内谢| 99精品女人在线观看免费视频| 一本一本大道香蕉久在线精品 | 亚洲奶水xxxx哺乳期| 中文字幕免费一区| 日本一区二区三区精品视频| 无码h黄肉3d动漫在线观看| 国产精品白丝jk黑袜喷水| 国产欧美日韩中文字幕| 日韩免费一级片| 国产一区二区三区站长工具| 日韩成人中文字幕在线观看| 激情av中文字幕| eeuss鲁片一区二区三区| 日韩欧美国产精品| 国产成人av片| 欧洲精品99毛片免费高清观看 | 国产精品久久久久久久av电影 | 国产aaaaaaaaa| 欧美在线色图| 色一区av在线| 91n在线视频| 亚洲高清资源在线观看| 欧美成人高清视频| 少妇久久久久久被弄高潮| 91精品国产乱码久久久久久| 九九久久综合网站| 欧美日韩中文视频| 亚洲欧美日韩国产一区二区| 88国产精品欧美一区二区三区| 久久久久99精品成人片三人毛片| 亚洲欧美激情诱惑| 日韩av片永久免费网站| 波多野结衣视频免费观看| 久久国产乱子精品免费女| 成人国产在线视频| 性欧美videos另类hd| 不卡视频在线观看| 欧美亚州在线观看| 在线中文资源天堂| 一区二区免费看| 你懂的av在线| av免费在线一区| 欧美情侣在线播放| 国产香蕉精品视频| 欧美日韩播放| 久久夜色精品国产亚洲aⅴ| 国产黄色片在线免费观看| 亚洲全部视频| 国产精品日本精品| 不卡的日韩av| 久久久亚洲午夜电影| 一区二区不卡在线| 韩日毛片在线观看| 欧美久久一二区| 久久精品女同亚洲女同13| 国产一区不卡| 久久91亚洲精品中文字幕奶水 | 欧美日韩国产综合久久| 日本成人在线免费| 精品国产欧美日韩| 欧美激情第三页| 国产成人无码av| 国产精品系列在线观看| 色综合久久久久久久久五月| 色www永久免费视频首页在线| 日韩欧美亚洲综合| www日本在线观看| 成人毛片免费看| 97视频色精品| www.四虎在线观看| 中文字幕乱码一区二区免费| 少妇人妻大乳在线视频| 色999韩欧美国产综合俺来也| 亚洲国产日韩欧美在线99| 国产精品酒店视频| 国产麻豆综合| 福利视频久久| 高潮毛片在线观看| 欧美午夜一区二区三区免费大片| 国产日韩视频一区| 午夜影院欧美| 国产精品日韩在线观看| 三级毛片在线免费看| 亚洲最快最全在线视频| 中文字幕一区久久| 欧美日一区二区| 欧美与黑人午夜性猛交久久久| 国产富婆一级全黄大片| 国产精品麻豆久久久| 日韩精品一区二区三区不卡 | 国产美女娇喘av呻吟久久| 日韩精品欧美在线| xx欧美xxx| 日韩风俗一区 二区| 欧美精品一级片| 国产精一品亚洲二区在线视频| 亚洲日本精品国产第一区| 裤袜国产欧美精品一区| 亚洲女人天堂成人av在线| 在线观看免费国产视频| 成人激情校园春色| 国产一二三在线视频| jizz国产精品| 欧美另类在线观看| 成人1区2区3区| 一区二区三区国产精品| 丰满少妇中文字幕| 亚洲成人三区| 福利视频久久| 超碰在线公开| 国产婷婷成人久久av免费高清| 国产又大又黑又粗免费视频| 99视频精品全部免费在线| 国产精品国产亚洲精品看不卡| 在线综合色站| 91国内在线视频| 欧美套图亚洲一区| 91电影在线观看| 亚洲一级片在线播放| 蜜臀va亚洲va欧美va天堂| 亚洲免费精品视频| 国产电影一区| 久久久久久免费精品| 婷婷在线观看视频| 日本精品免费观看高清观看| 一色道久久88加勒比一| 免费看日韩精品| 青少年xxxxx性开放hg| 久久在线观看| 97色在线观看| aⅴ在线视频男人的天堂| 7878成人国产在线观看| 日本免费在线播放| 91在线观看下载| 天天爱天天操天天干| 99久久亚洲精品| 国产精品一区二区免费看| 英国三级经典在线观看| 中文字幕在线视频日韩| 亚洲AV午夜精品| 欧美视频在线免费看| 欧美xxxx精品| 成人性生交大片免费看中文网站| 九一国产精品视频| 99九九热只有国产精品| 国产日韩欧美一区二区| 无人区在线高清完整免费版 一区二 | 国产精品狠色婷| 超碰免费公开在线| 亚洲精品色婷婷福利天堂| 亚洲影院一区二区三区| 亚洲va欧美va人人爽| 日韩福利在线视频| 懂色中文一区二区在线播放| av片中文字幕| 欧美日韩亚洲一区二区三区在线| 免费一区二区三区| 日本一区二区三区电影免费观看| 欧洲永久精品大片ww免费漫画| 日韩黄色影院| 亚洲免费中文字幕| 国产av无码专区亚洲av麻豆| 91国产精品成人| 91香蕉在线视频| 亚洲欧美日韩中文字幕一区二区三区 | 国产高清亚洲| 日本中文字幕久久看| 日本一本在线免费福利| 中文字幕在线观看日韩| 天天操天天干天天干| 69堂国产成人免费视频| 色老头在线视频| 婷婷六月综合亚洲| 日本a级片视频| 国产精品伦一区二区三级视频| 黄色国产在线观看| 成人综合在线观看| www.五月天色| 麻豆精品在线观看| 蜜臀av午夜一区二区三区 | 国产精品入口免费软件| 精品91在线| 肉大捧一出免费观看网站在线播放 | 国产一级特黄视频| 亚洲欧美日韩在线不卡| 婷婷综合在线视频| 日本一区二区高清| 三上悠亚ssⅰn939无码播放| av欧美精品.com| 少妇伦子伦精品无吗| 国产麻豆精品一区二区| 99re精彩视频| 奇米影视一区二区三区小说| 99re在线视频免费观看| 国产精品视区| 日韩精品视频久久| 午夜一区二区三区不卡视频| 国产精品久久久久7777| 黄色成人在线网站| 欧美成人精品免费| 国内一区二区三区| 国产九色porny| 国内精品亚洲| 日韩精品视频在线观看视频 | 国模视频一区二区| 丁香花在线影院| 久久久久久av| 俺来俺也去www色在线观看| 欧美夫妻性生活视频| 久草成色在线| 97av在线播放| 免费成人直播| 国产精品久久久久久久久| 国产成人精品一区二区三区在线 | 群体交乱之放荡娇妻一区二区 | 精品午夜久久| 亚洲欧美日韩在线综合| 91精品精品| 日本天堂免费a| 亚洲精品精选| 又色又爽又高潮免费视频国产| 日本va欧美va精品发布| 污污的网站免费| 国产精品18久久久久久久久| 国产av一区二区三区传媒| av资源站一区| 蜜臀久久99精品久久久久久| 国产精品国产三级国产aⅴ原创 | 欧美一区成人| 欧美午夜性视频| 久久精品综合| 日本中文字幕观看| 国产v综合v亚洲欧| 日本xxx在线播放| 国产精品另类一区| 国产无套在线观看| 欧美亚洲一区三区| www.爱爱.com| 精品亚洲男同gayvideo网站| 91九色在线porn| 欧美大学生性色视频| 松下纱荣子在线观看| 国产精品网站视频| xvideos.蜜桃一区二区| 日本在线观看一区| 欧美a级一区| 一本久道综合色婷婷五月| 九色porny丨国产精品| 国产大尺度视频| 国产欧美日本一区视频| 欧美黄色免费观看| 欧美在线视频你懂得| www.蜜臀av| 色诱女教师一区二区三区| 欧美性受ⅹ╳╳╳黑人a性爽| 日本a级片电影一区二区| 日本在线成人| 亚洲成人网上| 亚洲国产清纯| 色婷婷激情视频| 久久一区二区三区四区| 538精品在线观看| 91久久精品国产91性色tv| 国产高中女学生第一次| 亚洲人成在线观| 大桥未久在线播放| 91香蕉电影院| 欧美日韩在线二区| 国产a级一级片| 国产91丝袜在线18| 多男操一女视频| 一本久久精品一区二区| 男人的天堂a在线| 不卡中文字幕av| 久久国产三级| 欧美中日韩一区二区三区| 亚洲第一精品影视| 一级片黄色免费| 中文欧美字幕免费| 自拍偷拍18p| 亚洲女同精品视频| 在线天堂中文资源最新版| 国产福利久久| 黑人一区二区| 岛国大片在线免费观看| 成人免费在线观看入口| 这里只有久久精品视频| 亚洲美女黄色片| 忘忧草在线日韩www影院| 国产一区免费在线| 狠狠色丁香久久综合频道| www.桃色.com| 亚洲欧美一区二区三区久本道91| 亚洲天堂avav| 综合网中文字幕| www.久久| 亚洲成人网上| 九九久久精品视频 | 精品久久久久久中文字幕大豆网 | av高清在线免费观看| 国产成人在线免费观看| 欧美性猛交xxxxx少妇| 日韩亚洲欧美一区| 色呦呦在线资源| 99中文视频在线| 尤物网精品视频| 中文字幕a在线观看| 午夜精品一区二区三区电影天堂| 国产18精品乱码免费看| 欧美激情网站在线观看| 美女一区2区| 免费欧美一级视频| 国产亚洲欧洲997久久综合| 波多野结衣一二区| 日韩在线欧美在线| 伊人久久一区| 337p亚洲精品色噜噜狠狠p| 岛国av在线一区| 粉嫩aⅴ一区二区三区| 精品视频—区二区三区免费| 黄色综合网址| 在线视频91| 国产不卡免费视频| www日韩精品| 伊人一区二区三区久久精品| 欧美日韩大片在线观看| 欧美福利电影网| 欧美理论片在线播放| 国偷自产av一区二区三区小尤奈| 亚洲一区视频| 国产破处视频在线观看| 日韩欧美区一区二| 在线免费看h| 一区二区三区四区视频在线观看| 国精产品一区一区三区mba桃花| 欧美成欧美va| 日韩精品中文字幕视频在线| 日韩不卡在线| 99久久99久久精品| 26uuu久久天堂性欧美| 亚洲一区二区三区网站| 欧美日韩成人免费| 在线亚洲a色| 亚洲自拍第三页| 欧美性xxxx18| 国产精品一区二区三区视频网站| 国产日韩一区欧美| 久久精品久久综合| 国产污片在线观看| 深夜福利国产精品| 国产主播性色av福利精品一区| 天堂在线资源视频| 亚洲福利一区二区三区| aaa在线免费观看| 久久精品magnetxturnbtih| 美女视频网站黄色亚洲| 日本系列第一页| 久久天天躁日日躁| 国产不卡一二三区| 日本一级大毛片a一|