精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink 精確一次語義原理深度解析

大數據
本文將從原理到源碼,深入剖析Flink精確一次提交的實現機制,涵蓋Checkpoint流程、狀態管理、兩階段提交及與外部系統的集成等關鍵環節。

在分布式流處理系統中,"精確一次"(Exactly-Once, EO)語義是數據一致性的黃金標準,它確保每條數據在處理過程中不丟失、不重復,且只被處理一次。Apache Flink作為業界領先的流處理框架,通過Checkpoint機制、兩階段提交協議(Two-Phase Commit, 2PC)和狀態管理等核心技術,實現了端到端的精確一次語義。

本文將從原理到源碼,深入剖析Flink精確一次提交的實現機制,涵蓋Checkpoint流程、狀態管理、兩階段提交及與外部系統的集成等關鍵環節。

一、精確一次語義的核心挑戰與Flink的解決思路

1. 什么是精確一次語義?

在流處理場景中,數據處理的"一次"語義可分為三個層次:

  • 至少一次(At-Least-Once):數據至少被處理一次,可能因故障恢復導致重復處理。
  • 最多一次(At-Most-Once):數據最多被處理一次,可能因故障導致數據丟失。
  • 精確一次(Exactly-Once):數據嚴格被處理一次,無丟失、無重復。

端到端精確一次要求從數據源(如Kafka)→ Flink處理 → 外部存儲(如Kafka、HDFS)的整個鏈路都滿足EO語義。其核心挑戰在于:

  • 內部狀態一致性:Flink作業內部的算子狀態(如聚合結果、窗口數據)在故障恢復后需與輸入數據嚴格對齊。
  • 外部寫入一致性:寫入外部系統的數據需與Flink內部狀態同步提交,避免"內部狀態已提交,外部寫入失敗"或反之的情況。

2. Flink的解決思路

Flink通過以下三大核心技術實現端到端EO:

  • 分布式快照(Checkpoint):基于Chandy-Lamport算法的輕量級異步快照機制,定期保存作業全局狀態,為故障恢復提供一致性基準點。
  • 可插拔狀態后端(State Backend):管理狀態的存儲與訪問,支持內存、文件系統(HDFS)、RocksDB等多種存儲方式,確保狀態的持久化與高效恢復。
  • 兩階段提交協議(2PC):協調外部系統與Flink內部狀態的提交,實現"要么全部提交,要么全部回滾"的原子性。

二、Checkpoint機制:精確一次的基石

Checkpoint是Flink實現EO的核心機制,它通過定期生成作業全局狀態的快照,確保故障發生后能恢復到某個一致的狀態。Flink的Checkpoint基于Chandy-Lamport算法改進,具有輕量級、異步、增量等特點。

1. Checkpoint的核心概念

(1) Barrier(屏障)

Barrier是Checkpoint的核心觸發信號,它是一條特殊的數據記錄,由JobManager(作業協調器)注入到Source算子,并隨著數據流向下游算子廣播。Barrier將數據流分割為"Barrier之前的數據"和"Barrier之后的數據",算子收到Barrier后,會觸發當前狀態的快照。

Barrier的特性:

  • 廣播性:Barrier會廣播到所有并行算子實例,確保所有算子對齊到同一檢查點。
  • 對齊性:對于多輸入流算子(如KeyedJoin),需等待所有輸入流的Barrier到達后才能觸發快照,避免狀態不一致。
  • 異步性:快照過程異步執行,不阻塞數據流的正常處理。

(2) Checkpoint流程

一個完整的Checkpoint流程可分為以下步驟(以單并行度作業為例):

① 觸發Checkpoint:JobManager定期向所有Source算子發送TriggerCheckpoint消息,指定Checkpoint ID(如ckp-1)。

② Source注入Barrier:Source算子收到觸發消息后,停止處理新數據,在當前輸出位置插入Barrier(標記為ckp-1),然后將Barrier廣播給下游算子,同時將自身的狀態(如Kafka的offset)保存到狀態后端。

③ 中間算子快照與Barrier傳遞:中間算子(如Map、KeyedAgg)收到Barrier后,執行以下操作:

  • 暫停處理新數據:等待所有輸入流的Barrier到達(對齊階段)。
  • 快照狀態:將當前算子狀態(如聚合結果、窗口數據)異步保存到狀態后端。
  • 傳遞Barrier:向下游算子廣播Barrier,繼續處理新數據。

④ Sink算子確認與外部系統預提交:Sink算子收到Barrier后,快照自身狀態,并與外部系統交互(如Kafka事務預提交),向JobManager返回AcknowledgeCheckpoint消息。

⑤ Checkpoint完成:當所有算子都返回確認消息后,JobManager將Checkpoint標記為"已完成",并持久化Checkpoint元數據(如狀態存儲路徑、算子狀態偏移量)。

2. Barrier對齊:多輸入流的一致性保證

對于多輸入流算子(如KeyedJoin),Barrier對齊是確保狀態一致性的關鍵。假設算子有兩個輸入流(Input1和Input2),對齊過程如下:

  • 部分Barrier到達:假設Input1的Barrier先到達,算子會暫停處理Input1的數據,但繼續處理Input2的數據(因為其Barrier未到)。
  • 所有Barrier到達:當Input2的Barrier也到達后,算子觸發狀態快照,并向下游廣播Barrier。
  • 恢復處理:快照完成后,算子恢復處理兩個輸入流的數據。

對齊的意義:確保快照中包含的是"所有輸入流Barrier之前的數據"的處理結果,避免因部分輸入流延遲導致狀態不一致。

3. Checkpoint源碼解析

(1) Checkpoint觸發:JobManager端

Checkpoint的觸發由CheckpointCoordinator類(位于org.apache.flink.runtime.checkpoint包)負責。核心邏輯如下:

// CheckpointCoordinator.java
public void triggerCheckpoint(long timestamp, CheckpointProperties props) {
    // 1. 生成Checkpoint ID
    long checkpointID = checkpointIdCounter.getAndIncrement();
    
    // 2. 向所有Source任務發送TriggerCheckpoint消息
    for (ExecutionVertex vertex: tasksToTrigger) {
        if (vertex.getExecutionState() == ExecutionState.RUNNING) {
            vertex.triggerCheckpoint(checkpointID, timestamp, props);
        }
    }
}

CheckpointCoordinator是JobManager的核心組件,負責:

  • 定期觸發Checkpoint(通過ScheduledExecutorService調度)。
  • 跟蹤Checkpoint狀態(等待所有算子確認)。
  • 處理Checkpoint超時或失敗。

(2) Barrier注入與傳遞:StreamTask端

Source算子收到TriggerCheckpoint消息后,會在StreamTask(流處理任務基類)中注入Barrier。核心邏輯在StreamTask.performCheckpoint方法:

// StreamTask.java
privatevoidperformCheckpoint(CheckpointMetaData checkpointMetaData)throws Exception {
    // 1. 向下游廣播Barrier
    operatorChain.broadcastCheckpointBarrier(
        checkpointMetaData.getCheckpointId(),
        checkpointMetaData.getTimestamp(),
        checkpointMetaData.getCheckpointOptions()
    );
    
    // 2. 異步快照算子狀態
    Future<SnapshotResult> snapshotFuture = checkpointingOperation.snapshotState();
    
    // 3. 注冊回調,等待快照完成后通知JobManager
    snapshotFuture.thenAccept(snapshotResult -> {
        acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), snapshotResult);
    });
}
  • operatorChain.broadcastCheckpointBarrier:通過RecordWriter向所有輸出流寫入Barrier。
  • checkpointingOperation.snapshotState:調用算子的snapshotState方法,將狀態保存到狀態后端(如RocksDB)。

(3) Barrier對齊:StreamInputProcessor端

對于多輸入流算子,Barrier對齊由StreamInputProcessor(輸入處理器)實現。核心邏輯如下:

// StreamInputProcessor.java
public InputStatus pollNext()throws Exception {
    while (true) {
        // 1. 從輸入通道讀取數據或Barrier
        BufferOrEventbufferOrEvent= inputGate.getNextBufferOrEvent();
        
        if (bufferOrEvent.isBuffer()) {
            // 2. 如果是普通數據,檢查是否需要對齊
            if (checkAlignmentNeeded()) {
                // 當前通道的Barrier未到,暫停處理,緩存數據
                return InputStatus.MORE_AVAILABLE;
            } else {
                // 無需對齊,將數據交給算子處理
                return pushToOperator(bufferOrEvent.getBuffer());
            }
        } elseif (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
            // 3. 如果是Barrier,觸發對齊邏輯
            handleBarrier((CheckpointBarrier) bufferOrEvent.getEvent());
        }
    }
}

privatevoidhandleBarrier(CheckpointBarrier barrier) {
    // 記錄當前通道的Barrier到達
    barrierHandler.addBarrier(barrier.getChannelIndex(), barrier.getId());
    
    // 檢查所有通道的Barrier是否都已到達
    if (barrierHandler.isAllBarriersReceived()) {
        // 觸發狀態快照
        triggerCheckpoint(barrier.getId());
        // 重置對齊狀態,繼續處理數據
        barrierHandler.reset();
    }
}
  • checkAlignmentNeeded:判斷是否有其他輸入流的Barrier未到,若未到則緩存當前數據。
  • handleBarrier:處理Barrier到達事件,當所有通道的Barrier都到達后,觸發快照。

三、狀態管理:一致性的持久化保障

狀態是Flink流處理的核心,用于存儲算子的中間結果(如聚合值、窗口數據)。Flink通過**狀態后端(State Backend)**管理狀態的存儲、訪問和持久化,確保Checkpoint時狀態能被正確保存,故障恢復時能被準確加載。

1. 狀態的分類

Flink中的狀態分為兩類:

(1) Keyed State(鍵控狀態)

僅用于KeyedStream,狀態與某個Key綁定,不同Key的狀態獨立存儲。常見類型:

  • ValueState:單值狀態(如某個Key的計數器)。
  • ListState:列表狀態(如某個Key的窗口數據)。
  • MapState:映射狀態(如某個Key的維度信息)。

(2) Operator State(算子狀態)

與算子并行實例綁定,不依賴Key。常見類型:

  • ListState:每個并行實例維護一個列表(如Kafka Source的offset列表)。
  • BroadcastState:廣播狀態(如配置數據,所有并行實例共享)。

2. 狀態后端的實現

Flink提供三種內置狀態后端,其核心差異在于存儲位置和快照機制:

狀態后端

存儲位置

快照方式

適用場景

MemoryStateBackend

TaskManager內存

同步全量快照

本地調試、小狀態作業

FsStateBackend

內存+文件系統

同步全量快照

中等規模狀態、需要容錯

RocksDBStateBackend

本地RocksDB

異步增量快照

大規模狀態、長窗口作業

RocksDBStateBackend是生產環境最常用的后端,其核心優勢:

  • 增量快照:僅保存上次Checkpoint后變化的狀態數據,減少快照開銷。
  • 本地存儲:狀態存儲在TaskManager的本地磁盤,避免內存OOM。
  • 異步持久化:快照過程異步執行,不影響數據流處理。

3. 狀態快照與恢復源碼解析

(1) 狀態快照:SnapshotStrategy

狀態后端的快照邏輯由SnapshotStrategy接口定義,以RocksDBStateBackend為例,其增量快照實現為RocksDBIncrementalSnapshotStrategy:

// RocksDBIncrementalSnapshotStrategy.java
public SnapshotResultSupplier snapshotState(long checkpointId)throws Exception {
    // 1. 獲取RocksDB的 SST文件列表(增量數據)
    List<StateMetaInfoSnapshot> metaInfoSnapshots = metaHandler.snapshot();
    List<StreamStateHandle> sstFiles = uploadSstFiles(checkpointId);
    
    // 2. 生成增量快照元數據(包含SST文件路徑、偏移量等)
    IncrementalRemoteKeyedStateHandlestateHandle=newIncrementalRemoteKeyedStateHandle(
        checkpointId,
        sstFiles,
        metaInfoSnapshots
    );
    
    // 3. 返回快照結果(包含狀態句柄)
    return SnapshotResultSupplier.of(stateHandle);
}
  • uploadSstFiles:將RocksDB的增量SST文件上傳到分布式文件系統(如HDFS)。
  • IncrementalRemoteKeyedStateHandle:描述增量快照的元數據,恢復時用于定位狀態文件。

(2) 狀態恢復:StateBackend

故障恢復時,StateBackend根據Checkpoint元數據加載狀態。核心邏輯在RocksDBStateBackend.restoreKeyedState:

// RocksDBStateBackend.java
publicvoidrestoreKeyedState(List<KeyedStateHandle> stateHandles)throws Exception {
    for (KeyedStateHandle stateHandle : stateHandles) {
        if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            // 1. 下載增量SST文件到本地
            IncrementalRemoteKeyedStateHandleincrementalHandle=
                (IncrementalRemoteKeyedStateHandle) stateHandle;
            downloadSstFiles(incrementalHandle.getSstFiles());
            
            // 2. 將SST文件導入RocksDB實例
            rocksDB.restore(incrementalHandle.getMetaInfoSnapshots());
        }
    }
}
  • downloadSstFiles:從分布式文件系統下載SST文件到TaskManager本地磁盤。
  • rocksDB.restore:將SST文件導入RocksDB,恢復狀態數據。

四、兩階段提交協議:端到端精確一次的核心

Flink通過Checkpoint實現了內部狀態的精確一次,但端到端EO還需協調外部系統的寫入。例如,若Sink算子將數據寫入Kafka,需確保"內部狀態提交"與"Kafka數據寫入"原子性:要么同時成功,要么同時失敗。

Flink基于**兩階段提交協議(2PC)**實現了這一目標,核心抽象是TwoPhaseCommitSinkFunction(位于org.apache.flink.streaming.api.functions.sink包)。

1. 兩階段提交的核心流程

TwoPhaseCommitSinkFunction將Sink操作分為兩個階段,與Checkpoint流程緊密耦合:

階段1:預提交(Pre-commit)

  • 觸發時機:Checkpoint過程中,算子快照狀態后。
  • 操作:將數據寫入外部系統的"臨時區域"(如Kafka的事務日志),但不提交,此時外部系統不可見數據。
  • 目的:確保數據已持久化到外部系統,但未對外生效,可隨時回滾。

階段2:提交(Commit)

  • 觸發時機:所有算子完成Checkpoint,JobManager通知"Checkpoint完成"后。
  • 操作:通知外部系統提交預提交的數據(如Kafka提交事務),數據對外可見。
  • 異常處理:若提交失敗,Flink會重試(通過恢復Checkpoint后重新提交)。

2. TwoPhaseCommitSinkFunction的核心方法

TwoPhaseCommitSinkFunction是一個抽象類,用戶需實現以下方法以適配外部系統:

方法名

作用

beginTransaction

開啟一個新事務(如Kafka的beginTransaction)

invoke

將數據寫入事務緩沖區(如Kafka的send方法,數據寫入事務日志)

preCommit

預提交事務(如Kafka的sendOffsetsToTransaction,提交offset到事務)

commit

提交事務(如Kafka的commitTransaction)

abort

回滾事務(如Kafka的abortTransaction)

3. Flink + Kafka端到端EO案例

以Flink消費Kafka數據,處理后寫入Kafka為例,說明端到端EO的實現:

(1) Source端:KafkaConsumer的offset管理

Flink的FlinkKafkaConsumer將Kafka的offset作為算子狀態存儲在OperatorState中,Checkpoint時持久化offset。恢復時,從Checkpoint加載offset,確保消費位置不丟失。

(2) Sink端:KafkaProducer的事務寫入

FlinkKafkaProducer繼承TwoPhaseCommitSinkFunction,實現Kafka的事務寫入:

// FlinkKafkaProducer.java(簡化版)
publicclassFlinkKafkaProducer<T> extendsTwoPhaseCommitSinkFunction<T, KafkaTransactionState, Void> {
    
    @Override
    protected KafkaTransactionState beginTransaction()throws Exception {
        // 1. 開啟Kafka事務
        KafkaProducer<byte[], byte[]> producer = getKafkaProducer();
        producer.beginTransaction();
        returnnewKafkaTransactionState(producer.getProducerId(), producer.getEpoch());
    }
    
    @Override
    protectedvoidinvoke(KafkaTransactionState transaction, T value, Context context)throws Exception {
        // 2. 將數據寫入事務緩沖區(未提交)
        producer.send(newProducerRecord<>(topic, value.getBytes()));
    }
    
    @Override
    protectedvoidpreCommit(KafkaTransactionState transaction)throws Exception {
        // 3. 預提交:將offset寫入事務日志(確保消費與寫入一致性)
        Map<TopicPartition, OffsetAndMetadata> offsets = getOffsetsToCommit();
        producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    }
    
    @Override
    protectedvoidcommit(KafkaTransactionState transaction)throws Exception {
        // 4. 提交事務,數據對外可見
        producer.commitTransaction();
    }
    
    @Override
    protectedvoidabort(KafkaTransactionState transaction)throws Exception {
        // 5. 異常時回滾事務
        producer.abortTransaction();
    }
}

(3) 端到端流程時序

  • Checkpoint觸發:JobManager向Source發送TriggerCheckpoint。
  • Source快照offset:FlinkKafkaConsumer將當前offset保存到狀態后端,廣播Barrier。
  • Sink預提交:FlinkKafkaProducer收到Barrier后,調用preCommit,將數據寫入Kafka事務日志(未提交)。
  • Checkpoint完成:所有算子確認后,JobManager通知Sink"Checkpoint完成"。
  • Sink提交事務:FlinkKafkaProducer調用commit,Kafka提交事務,數據對外可見。
  • 故障恢復:若步驟5失敗,Flink從上次Checkpoint恢復,重新調用commit(Kafka事務冪等,重復提交無影響)。

4. 兩階段提交源碼解析

(1) 預提交與狀態快照

TwoPhaseCommitSinkFunction的snapshotState方法在Checkpoint時調用,觸發預提交并保存事務狀態:

// TwoPhaseCommitSinkFunction.java
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // 1. 調用用戶實現的preCommit(預提交事務)
    preCommit(currentTransaction);
    
    // 2. 將當前事務狀態保存到狀態后端(如Kafka的producerId、epoch)
    List<State<TxT>> transactions = new ArrayList<>();
    transactions.add(currentTransaction);
    state.clear();
    state.add(transactions);
}
  • preCommit:用戶實現的外部系統預提交邏輯(如Kafka的sendOffsetsToTransaction)。
  • state.add:將事務狀態(如Kafka事務標識)保存到OperatorState,故障恢復時用于重新提交。

(2) 提交與回滾

Checkpoint完成后,JobManager調用notifyCheckpointComplete通知Sink提交事務:

// TwoPhaseCommitSinkFunction.java
publicvoidnotifyCheckpointComplete(long checkpointId)throws Exception {
    // 1. 從狀態后端獲取Checkpoint對應的事務狀態
    Iterator<State<TxT>> iterator = state.get().iterator();
    if (iterator.hasNext()) {
        State<TxT> state = iterator.next();
        TxTtransaction= state.getTransaction();
        
        // 2. 調用用戶實現的commit(提交事務)
        commit(transaction);
        
        // 3. 清理已提交的事務狀態
        iterator.remove();
    }
}
  • commit:用戶實現的外部系統提交邏輯(如Kafka的commitTransaction)。
  • 若notifyCheckpointComplete未調用(如JobManager掛掉),恢復時會從狀態后端加載未提交的事務,重新調用commit。

五、精確一次的語義邊界與優化

1. 語義邊界

Flink的端到端精確一次語義需滿足以下條件:

  • Source支持可重置偏移量:如Kafka、Pulsar等,能從指定offset重新消費。
  • Sink支持事務或冪等寫入:如Kafka事務、HDFS冪等寫入、MySQL事務等。
  • 狀態后端支持持久化:如FsStateBackend、RocksDBStateBackend,確保狀態可恢復。
  • Checkpoint配置正確:需啟用Checkpoint(enableCheckpointing(true)),并設置CheckpointingMode.EXACTLY_ONCE。

2. 性能優化

精確一次語義會帶來額外開銷(如Barrier對齊、兩階段提交),可通過以下方式優化:

(1) 增量Checkpoint

使用RocksDBStateBackend,僅保存增量狀態數據,減少快照時間和網絡開銷。

(2) 對齊超時(Alignment Timeout)

對于低延遲要求高的場景,可設置setAlignmentTimeout,允許部分算子在對齊超時后跳過對齊(犧牲部分一致性換取低延遲)。

(3) Unaligned Checkpoint(非對齊Checkpoint)

Flink 1.11引入非對齊Checkpoint,跳過Barrier對齊階段,直接緩存所有輸入通道的數據并快照,大幅降低延遲(適合高延遲、高吞吐場景)。

六、總結

Flink的精確一次提交語義是Checkpoint機制、狀態管理、兩階段提交協議三者協同的結果:

  • Checkpoint:通過Barrier和分布式快照,實現內部狀態的一致性基準點。
  • 狀態管理:通過可插拔狀態后端,確保狀態的持久化與高效恢復。
  • 兩階段提交:協調外部系統與內部狀態的原子性提交,實現端到端EO。

從源碼層面看,CheckpointCoordinator負責全局協調,StreamTask實現Barrier傳遞與狀態快照,TwoPhaseCommitSinkFunction封裝外部系統的兩階段提交邏輯。這種分層設計使得Flink在保證嚴格一致性的同時,兼顧了靈活性和性能。

正是這些精巧的設計,讓Flink成為實時數倉、CEP、實時ETL等場景的首選流處理框架,為企業的實時數據處理提供了可靠的一致性保障。

責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2022-02-19 09:09:37

數倉Flink CP分布式

2011-08-12 09:30:02

MongoDB

2021-02-01 08:41:45

Flink語義數據

2022-02-20 10:47:54

Flink CP通用算法實時數倉

2021-06-02 07:07:09

Flink處理語義

2019-11-08 16:05:54

Promise前端鏈式調用

2011-11-15 13:34:22

蘋果iTunes Matc

2014-08-29 09:09:33

2021-05-26 11:06:06

Kubernetes網絡故障集群節點

2024-03-18 09:10:00

死鎖日志binlog

2011-06-28 10:41:50

DBA

2020-10-24 13:50:59

Python編程語言

2021-12-27 10:08:16

Python編程語言

2020-10-18 12:53:29

黑科技網站軟件

2020-03-10 07:51:35

面試諷刺標準

2020-03-18 13:07:16

華為

2017-01-23 12:40:45

設計演講報表數據

2019-08-19 08:01:50

Flink數據管理內存

2024-05-28 00:00:02

Java線程程序

2021-06-29 10:18:07

Kafka宕機系統
點贊
收藏

51CTO技術棧公眾號

91麻豆成人精品国产| 少妇大叫太粗太大爽一区二区| 看黄网站在线| 成人一区二区视频| 日本高清+成人网在线观看| 欧美做受高潮6| 亚洲ww精品| 亚洲成a人v欧美综合天堂| 欧美精品亚洲| 国产亲伦免费视频播放| 日韩视频在线一区二区三区 | 国产精品久久免费看| 91九色在线观看| 国产伦精品一区二区三区视频我| 久久福利综合| 日韩av资源在线播放| 97视频在线免费播放| 成人免费看片| 国产日韩欧美综合一区| 91在线短视频| 中文在线观看免费高清| 日韩视频不卡| 欧美剧在线观看| 国产高清一区二区三区四区| 成人性生交大片免费看96| 欧美综合久久久| 日韩av新片网| caopo在线| 国产日韩精品一区二区三区 | 国产极品久久久| 天堂成人国产精品一区| 亚州精品天堂中文字幕| 一区二区国产精品精华液| 免费av一区| 日韩大片免费观看视频播放| 女人高潮一级片| 日本免费久久| 疯狂做受xxxx欧美肥白少妇 | 色婷婷av在线| 中文字幕一区免费在线观看| 日本一区二区在线| 天堂中文在线8| 成人免费毛片高清视频| 97人人干人人| a级片在线播放| 久久国产精品免费| 国产精品视频导航| 久草视频在线免费| 丝袜国产日韩另类美女| 欧美最猛性xxxxx免费| 福利一区二区三区四区| 国产在线不卡| 久久久久久久久爱| 久久久久成人网站| 欧美性色综合| 欧美肥婆姓交大片| 久久这里只有精品国产| 亚洲天堂男人| 97在线看福利| 久久久久久久久久久久久久av| 亚洲东热激情| 2021国产精品视频| 久久国产黄色片| 国产精品美女久久久| 91精品国产91久久久久久吃药| 日韩免费一二三区| aa亚洲婷婷| 奇米4444一区二区三区| 无码人妻丰满熟妇精品| 日本欧美加勒比视频| 国产精品丝袜久久久久久不卡| 亚洲天堂自拍偷拍| 国产中文字幕精品| 99re在线视频观看| 欧美一级特黄aaaaaa大片在线观看| 成人免费av网站| 久久精品日产第一区二区三区| 青青草免费观看免费视频在线| 国产三级欧美三级| 免费看啪啪网站| 欧美videossex另类| 精品福利在线视频| 成人性视频欧美一区二区三区| 韩日一区二区| 日韩一级欧美一级| 亚洲av无码成人精品国产| 九九久久婷婷| 欧美成年人视频网站欧美| 色网站在线播放| 蜜桃久久久久久| 国产精品三区四区| 国产经典自拍视频在线观看| 亚洲男人的天堂在线aⅴ视频| 九一国产精品视频| 欧美日韩破处视频| 日韩精品久久久久久福利| 九九九视频在线观看| 国产一区亚洲| 国产精品中文字幕在线| 日本精品一区二区在线观看| 国产精品网曝门| 缅甸午夜性猛交xxxx| 亚洲成人高清| 亚洲人精选亚洲人成在线| 欧洲美女女同性互添| 老鸭窝亚洲一区二区三区| 亚洲综合最新在线| 精品欧美不卡一区二区在线观看| 亚洲欧美日韩在线| 欧美黄色一级片视频| 日韩欧美另类中文字幕| 在线日韩中文字幕| 69成人免费视频| 国产精品一区二区在线播放| 日韩免费av一区二区三区| 最近中文字幕免费mv2018在线| 污片在线观看一区二区| 九一精品在线观看| 深夜福利亚洲| 日韩电影免费观看中文字幕| av在线播放中文字幕| 国内精品福利| 国产日韩精品电影| 亚洲欧美国产高清va在线播放| 国产精品欧美精品| av之家在线观看| 九七电影院97理论片久久tvb| 亚洲成人av中文字幕| 国产又粗又硬又长又爽| 午夜在线播放视频欧美| 91久久极品少妇xxxxⅹ软件| 免费在线视频欧美| 日韩欧美精品网站| 99精品一区二区三区无码吞精| 日韩精品dvd| 国产91精品久久久久久久| 国产成人毛毛毛片| 中文字幕的久久| 欧美黄色性生活| 亚洲区小说区图片区qvod按摩| 欧美成人免费网| 中文字幕日本人妻久久久免费 | 男人操女人下面视频| 精品国产一区二区三区香蕉沈先生| 欧美黄色三级网站| 国产精品51麻豆cm传媒| 91麻豆精品视频| 久久av综合网| 亚洲三区欧美一区国产二区| xx视频.9999.com| 一区二区视频免费| 国产精品第一页第二页第三页| 国产精品亚洲a| 亚洲国产合集| 欧美一区在线直播| 免费的黄色av| 亚洲伊人伊色伊影伊综合网| avtt中文字幕| 欧美久久99| 99免费在线视频观看| 91麻豆一二三四在线| 欧美一级视频精品观看| 糖心vlog免费在线观看 | 久久精品视频在线观看免费| 欧美一级精品| 国产精品夜间视频香蕉| 在线免费看黄| 日韩欧美亚洲国产精品字幕久久久| 天天鲁一鲁摸一摸爽一爽| 激情综合网最新| 超碰97免费观看| 精品国模一区二区三区欧美 | 麻豆国产在线| 亚洲码在线观看| 男人天堂视频网| 国产精品少妇自拍| 欧美又黄又嫩大片a级| 精品成人免费| 蜜桃999成人看片在线观看| 一个人看的www视频在线免费观看| 亚洲国内精品视频| 男人天堂2024| 一区在线播放视频| 三大队在线观看| 亚洲精品孕妇| 日本午夜一区二区三区| 九九九精品视频| 色哟哟网站入口亚洲精品| www.五月激情| 欧美色图在线视频| 一区二区三区在线观看免费视频| 另类小说综合欧美亚洲| 黄色免费高清视频| 成人爽a毛片| 日本a级片电影一区二区| 国产福利在线观看| 91麻豆精品国产自产在线| 国产在线视频你懂的| 波多野结衣一区二区三区| 不卡的av中文字幕| 精品二区久久| 亚洲国产精品www| 日本一区二区乱| 8x拔播拔播x8国产精品| 超碰超碰在线| 亚洲乱码av中文一区二区| 在线视频欧美亚洲| 亚洲图片自拍偷拍| 精品欧美一区二区久久久| thepron国产精品| 国产高潮免费视频| 在线国产精品一区| 亚洲在线色站| 麻豆视频一区| av一区观看| 天堂久久午夜av| 高清在线视频日韩欧美| av电影在线观看网址| 精品国产不卡一区二区三区| 国产精品视频第一页| 欧美视频国产精品| 少妇久久久久久被弄高潮| 91色综合久久久久婷婷| 日本一本在线视频| 激情另类小说区图片区视频区| 内射国产内射夫妻免费频道| 婷婷综合伊人| 一区视频二区视频| 国产一区二区三区不卡视频网站| 97超碰资源| 四虎影视成人精品国库在线观看| 欧美放荡办公室videos4k| 国产最新在线| 中文字幕亚洲专区| 美国一级片在线免费观看视频| 日韩精品一区二区三区视频在线观看| 在线观看中文字幕视频| 亚洲成在人线免费| 五月综合色婷婷| 久久久亚洲国产美女国产盗摄| 台湾佬美性中文| 久久99精品国产.久久久久久| 尤蜜粉嫩av国产一区二区三区| 99亚洲一区二区| 国产美女永久无遮挡| 亚洲a在线视频| 一本一生久久a久久精品综合蜜| 欧美色图在线播放| 麻豆精品视频| 日韩影视在线观看| 好吊色欧美一区二区三区视频| 欧美日本三级| 国产精品主播视频| 少妇高潮一区二区三区99| 国产精品欧美风情| 日韩精选视频| 国产精品久久二区| 日韩毛片免费看| 成人一区二区电影| 久久久久久一区二区三区四区别墅 | 亚洲乱妇老熟女爽到高潮的片| 精品一区二区三区不卡| 日本不卡一区二区在线观看| 韩国理伦片一区二区三区在线播放 | 91九色最新地址| www.日韩一区| 欧美日韩国产综合久久| 中文字幕在线观看第二页| 91精品国产综合久久久久| 国产精品女同一区二区| 7878成人国产在线观看| 国产伦精品一区二区三区四区| 7777精品伊人久久久大香线蕉超级流畅 | 午夜免费精品视频| 久久国产精品第一页| 五月六月丁香婷婷| 国产69精品久久99不卡| 精品无码人妻少妇久久久久久| 久久综合久久综合久久综合| 成人黄色免费网址| 国产精品伦一区二区三级视频| 任你操精品视频| 亚洲欧美国产三级| 黄色一级片免费看| 午夜久久久影院| 五月婷婷六月婷婷| 欧美精品成人一区二区三区四区| va视频在线观看| 亚洲女成人图区| 日本成人在线播放| 欧美激情xxxxx| 伊伊综合在线| 国产精品看片资源| 粉嫩av一区二区| 欧美精品与人动性物交免费看| 成人精品久久| 国产亚洲精品久久久久久久| 久久动漫亚洲| 亚洲欧美日韩网站| 91网站视频在线观看| 日韩不卡av在线| 午夜精品久久久久久久久 | 欧美高清视频在线高清观看mv色露露十八| 国产精品爽爽久久| 亚洲加勒比久久88色综合| 日本三级在线视频| 午夜精品久久久久久久99黑人 | 久久精品中文字幕免费mv| 成人在线观看亚洲| 国产精品久久久久久久天堂| 亚洲国产中文在线| 日本欧美色综合网站免费| 中文字幕一区二区三区欧美日韩| 北条麻妃视频在线| 国产精品一区二区在线观看网站| 巨胸大乳www视频免费观看| 亚洲欧美国产毛片在线| 中国精品一区二区| 亚洲国产精品电影| 精品美女在线观看视频在线观看 | 欧美一级搡bbbb搡bbbb| 青梅竹马是消防员在线| 中文字幕亚洲无线码在线一区| sm捆绑调教国产免费网站在线观看| 欧洲精品久久久| 视频一区视频二区欧美| 一卡二卡3卡四卡高清精品视频| 伊人狠狠色j香婷婷综合| 欧美一区二区三区影院| 国产精品视频麻豆| 国产日产精品一区二区三区| 91精品国产综合久久香蕉麻豆 | 成人一区二区视频| 91porn在线视频| 欧美日韩久久不卡| 精品久久av| 国产精彩精品视频| 欧美激情影院| 国产精品一色哟哟| 国产主播一区二区三区| 国产在线一卡二卡| 91精品国产综合久久久久久久| 男人的天堂av高清在线| 97热精品视频官网| 婷婷亚洲成人| 国产黄页在线观看| 国产99久久久国产精品潘金| 91嫩草|国产丨精品入口| 日韩一区二区三区在线| 免费a在线看| 国产日本欧美在线观看 | 福利一区福利二区| 欧美三级日本三级| 日韩欧美亚洲另类制服综合在线| 色呦呦网站在线观看| 97人人模人人爽视频一区二区| 亚洲精品成人无限看| 亚洲午夜精品一区| 一区二区三区免费在线观看| 999久久久久| 欧美成人免费在线视频| 久久久精品区| 日本熟妇人妻xxxx| 成人免费视频免费观看| 99免费在线观看| 欧美成人精精品一区二区频| 超碰在线中文字幕| 国产91精品入口17c| 韩日在线一区| 中文字幕天堂av| 一本色道综合亚洲| 成年人在线免费观看| 国产精品一区二区久久| 欧美黄色精品| 日b视频在线观看| 色综合激情久久| 精品视频二区| 51国偷自产一区二区三区的来源| 欧美一区在线看| 亚洲欧美日韩偷拍| 欧美日韩国产一中文字不卡| 免费av在线电影| 国产精品美乳一区二区免费| 亚洲激情五月| 91av在线免费| 欧美色图天堂网| 国内精品久久久久国产| 亚洲最大的网站| 日韩国产欧美三级| 国产稀缺精品盗摄盗拍| 精品日本一线二线三线不卡| 写真福利精品福利在线观看| 中文字幕免费在线不卡| 国产91精品久久久久久久网曝门| 久久视频免费看| 在线精品国产欧美| 亚洲日本va| 999精品网站| 亚洲成a人v欧美综合天堂下载| 国产最新视频在线|