精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Flink SQL 四種 Join 方式詳解:原理、場景與實戰

大數據
本文將深入剖析每種 Join 的底層原理、適用場景,并通過完整代碼示例演示實戰用法,幫助讀者掌握 Flink SQL Join 的選型與優化。

一、引言:流處理中的 Join 挑戰與 Flink SQL 的解決方案

在數據處理領域,Join 是關聯多表數據的核心操作。批處理中(如 Hive、Spark SQL),Join 通常針對有限數據集,通過全量數據匹配即可完成。但在流處理場景中,數據是無限、實時、無界的,Join 操作面臨三大核心挑戰:

  • 數據延遲與亂序:流數據可能因網絡延遲、節點故障等原因亂序到達,如何確保 Join 的正確性?
  • 狀態管理:流 Join 需要存儲歷史數據用于后續匹配,如何高效管理狀態(內存/磁盤)并避免無限增長?
  • 實時性與準確性平衡:不同業務場景對延遲(如實時監控)和準確性(如賬單核對)要求不同,如何靈活適配?

Flink 作為業界領先的流處理引擎,其 SQL API 提供了四種核心 Join 方式,分別針對不同場景解決了上述問題:Regular Join(常規 Join)、Interval Join(區間 Join)、Temporal Join(時態 Join) 和 Lookup Join(維表 Join)。本文將深入剖析每種 Join 的底層原理、適用場景,并通過完整代碼示例演示實戰用法,幫助讀者掌握 Flink SQL Join 的選型與優化。

二、Regular Join:常規雙流 Join

1. 定義與核心思想

Regular Join 是 Flink SQL 中最基礎的 Join 類型,語法與標準 SQL 一致(如 INNER JOIN、LEFT JOIN)。它持續關聯兩條流的數據:當任一流收到新數據時,會掃描另一條流的所有歷史數據,生成匹配結果并更新下游。

核心特點:

  • 無時間限制:只要兩條流的數據滿足 Join 條件,無論時間差多大,都會關聯(例如,訂單流在 1 小時后收到用戶流的數據,仍會觸發 Join)。
  • 狀態持久化:兩條流的所有數據都會存儲在狀態中(基于 StateBackend),用于后續匹配。
  • 結果更新:下游會收到“插入”(Insert)、“更新”(Update)、“撤回”(Retract)三種類型的結果(取決于 Join 類型)。

2. 適用場景

Regular Join 適用于對數據完整性要求高、允許延遲且數據量較小的場景,例如:

  • 實時用戶畫像補全:將用戶行為流與用戶屬性流關聯,即使屬性數據延遲到達,也能更新行為記錄的用戶信息。
  • 離線數據實時修正:當歷史數據需要修正時(如用戶地址更新),通過 Regular Join 可將修正后的數據與實時流關聯,更新下游結果。

3. 實現原理

Regular Join 的底層基于 Flink 的 KeyedCoProcessFunction 算子實現,核心流程如下:

  • 數據分區:根據 Join 條件中的 ON 子句(如 user_id)對兩條流進行 KeyBy,確保相同 Key 的數據進入同一子任務。
  • 狀態存儲:每條流的數據以 MapState 形式存儲,Key 為 Join Key,Value 為對應數據的列表(支持多條數據,如同一用戶的多個訂單)。
  • 匹配與觸發:當任一流收到數據時,從另一條流的狀態中讀取相同 Key 的所有數據,進行匹配計算,并將結果發送到下游。例如,訂單流收到 order_id=1, user_id=101 時,會從用戶流狀態中查詢 user_id=101 的所有用戶數據,生成 Join 結果。
  • 狀態清理:默認情況下,Regular Join 的狀態永不清理(需手動配置 State TTL,否則可能導致 OOM)。

4. 語法說明

Regular Join 支持標準 SQL 的 Join 類型,語法如下:

SELECT 
    o.order_id, 
    u.user_name, 
    o.amount
FROM orders AS o  -- 訂單流
[INNER|LEFT|RIGHT|FULL] JOIN users AS u  -- 用戶流
ON o.user_id = u.user_id;

5. 詳細樣例代碼

(1) 場景描述

實時關聯訂單流(orders)和用戶流(users),通過 user_id 關聯,輸出訂單 ID、用戶名稱和訂單金額。假設數據可能延遲到達(如用戶信息在訂單產生后 10 分鐘才更新)。

(2) 環境準備

  • Flink 版本:1.18
  • 依賴:flink-java、flink-streaming-java、flink-clients、flink-connector-kafka(用于數據源)
  • 數據源:Kafka 中的 orders 和 users 主題,數據格式為 JSON。

(3) 步驟

步驟 1:創建 Kafka 表(DDL)

-- 訂單流表(Kafka 源)
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND-- 水位線(Regular Join 不強制依賴,但建議定義)
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

-- 用戶流表(Kafka 源)
CREATE TABLE users (
    user_id STRING,
    user_name STRING,
    age INT,
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='users',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

-- 結果表(Print 控制臺輸出)
CREATE TABLE order_user_result (
    order_id STRING,
    user_name STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='print'
);

步驟 2:編寫 Regular Join SQL

-- INNER JOIN:僅輸出兩條流都匹配的數據
INSERT INTO order_user_result
SELECT
    o.order_id, 
    u.user_name, 
    o.amount
FROM orders AS o
INNERJOIN users AS u
ON o.user_id = u.user_id;

-- LEFT JOIN:輸出訂單流所有數據,用戶流不匹配時 user_name 為 NULL
-- INSERT INTO order_user_result
-- SELECT 
--     o.order_id, 
--     u.user_name, 
--     o.amount
-- FROM orders AS o
-- LEFT JOIN users AS u
-- ON o.user_id = u.user_id;

步驟 3:準備測試數據

  • 訂單流數據(orders 主題):
{"order_id":"order_1","user_id":"101","amount":100.50,"event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","user_id":"102","amount":200.00,"event_time":"2023-10-01 10:01:00"}
{"order_id":"order_3","user_id":"101","amount":150.75,"event_time":"2023-10-01 10:02:00"}
  • 用戶流數據(users 主題):
{"user_id":"101","user_name":"Alice","age":25,"update_time":"2023-10-01 10:00:30"}
{"user_id":"102","user_name":"Bob","age":30,"update_time":"2023-10-01 10:01:30"}
{"user_id":"101","user_name":"Alice_Update","age":26,"update_time":"2023-10-01 10:03:00"}  -- 用戶 101 信息更新

步驟 4:執行結果分析

INNER JOIN 結果:

+I[order_1, Alice, 100.50]  -- 訂單 1 與用戶 101 匹配(10:00:30 到達)
+I[order_2, Bob, 200.00]    -- 訂單 2 與用戶 102 匹配(10:01:30 到達)
+I[order_3, Alice, 150.75]  -- 訂單 3 與用戶 101 匹配(10:02:00 到達,此時用戶 101 名稱仍為 Alice)
-U[order_1, Alice, 100.50]  -- 撤回舊結果(用戶 101 信息更新)
+I[order_1, Alice_Update, 100.50]  -- 插入更新后的結果(訂單 1 關聯到新用戶名稱)
-U[order_3, Alice, 150.75]  -- 撤回訂單 3 的舊結果
+I[order_3, Alice_Update, 150.75]  -- 插入訂單 3 的新結果

關鍵觀察:

  • 當用戶 101 的信息更新(user_name 從 "Alice" 變為 "Alice_Update")時,所有關聯 user_id=101 的訂單(order_1、order_3)都會觸發結果更新(先撤回舊記錄,再插入新記錄)。
  • Regular Join 的狀態會保存所有歷史數據(如訂單流的所有訂單、用戶流的所有用戶),因此需配置 State TTL 避免內存溢出。

步驟 5:配置 State TTL(優化)

在 flink-conf.yaml 或 Table API 中配置狀態過期時間:

-- 在表級別配置 State TTL(僅對 Regular Join 生效)
CREATE TABLE orders_with_ttl (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset',
    'join.state.ttl'='1h'-- Join 狀態過期時間為 1 小時
);

6. 注意事項

  • 狀態膨脹風險:Regular Join 的狀態會無限增長(除非配置 TTL),僅適用于數據量小的場景。若數據量大,建議使用 Interval Join 或 Lookup Join。
  • 結果更新頻率:若上游數據頻繁更新(如用戶信息每秒修改),會導致下游產生大量更新記錄,需評估下游系統(如 Kafka、數據庫)的承受能力。

Join 類型選擇:

  • INNER JOIN:僅輸出匹配數據,狀態存儲兩條流的數據。
  • LEFT JOIN:輸出左流所有數據,右流不匹配時為 NULL,狀態存儲左流所有數據 + 右流匹配數據。
  • FULL JOIN:輸出兩條流所有數據,不匹配時為 NULL,狀態存儲兩條流所有數據(狀態最大,慎用)。

三、Interval Join:區間 Join

1. 定義與核心思想

Interval Join 是基于時間區間的雙流 Join,要求兩條流的數據在指定時間范圍內才能匹配。語法上通過 BETWEEN ... AND ... 定義時間區間,僅支持事件時間(Event Time)。

核心特點:

  • 時間區間限制:僅當兩條流的數據時間戳差值在 [lower_bound, upper_bound] 區間內時才關聯(例如,訂單流數據時間戳 ±5 分鐘內的支付流數據)。
  • 狀態自動清理:利用 Watermark 機制,當 Watermark 超過數據時間戳 + 上界時,自動清理過期數據,避免狀態膨脹。
  • 僅支持追加流:Interval Join 的輸入流必須是追加流(Append-only),不支持更新或撤回(因為時間區間內的數據一旦匹配,后續不會再更新)。

2. 適用場景

Interval Join 適用于有時間關聯要求、數據量大且需高效清理狀態的場景,例如:

  • 訂單支付關聯:訂單產生后,僅關聯 5 分鐘內的支付記錄(超時未支付則認為訂單失效)。
  • 物流軌跡匹配:快遞攬收后,僅關聯 1 小時內的運輸記錄(超時未運輸則觸發異常告警)。

3. 實現原理

Interval Join 的底層基于 KeyedCoProcessFunction 和 Watermark 機制,核心流程如下:

(1) 時間區間定義:通過 ON o.order_id = p.order_id AND o.event_time BETWEEN p.event_time - INTERVAL '5' MINUTE AND p.event_time + INTERVAL '5' MINUTE 定義時間范圍。

(2) 數據緩存與注冊定時器:

  • 當左流(訂單流)收到數據 o 時,將其存入 MapState(Key 為 Join Key,Value 為數據 + 時間戳),并注冊一個定時器(觸發時間為 o.event_time + upper_bound)。
  • 右流(支付流)同理,緩存數據并注冊定時器。

(3) 匹配與觸發:

  • 當左流收到數據 o 時,從右流狀態中查詢相同 Key 且時間戳在 [o.event_time - lower_bound, o.event_time + upper_bound] 內的數據,生成匹配結果。
  • 當右流收到數據 p 時,同理匹配左流數據。

(4) 狀態清理:當定時器觸發(即 Watermark ≥ 數據時間戳 + upper_bound),從狀態中刪除該數據(因為后續不會再有時間區間內的數據到達)。

4. 語法說明

Interval Join 的語法需在 ON 子句中添加時間區間條件:

SELECT 
    o.order_id, 
    p.payment_id, 
    o.amount
FROM orders AS o
JOIN payments AS p
ON o.order_id = p.order_id  -- Join 條件
AND o.event_time BETWEEN p.event_time - INTERVAL '5' MINUTE AND p.event_time + INTERVAL '5' MINUTE;  -- 時間區間

時間區間規則:

  • BETWEEN ... AND ... 定義的是左流時間相對于右流時間的范圍(或反之)。
  • 區間必須是對稱的(如 ±5 分鐘),且僅支持事件時間(需定義 WATERMARK)。

5. 詳細樣例代碼

(1) 場景描述

關聯訂單流(orders)和支付流(payments),僅匹配訂單產生后 5 分鐘內的支付記錄,輸出訂單 ID、支付 ID 和訂單金額。超時未支付的訂單不會出現在結果中。

(2) 環境準備

與 Regular Join 相同,使用 Kafka 作為數據源,新增支付流表 payments。

步驟 1:創建支付流表(DDL)

-- 支付流表(Kafka 源)
CREATE TABLE payments (
    payment_id STRING,
    order_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='payments',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='interval_join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

-- 結果表(Print 輸出)
CREATE TABLE order_payment_result (
    order_id STRING,
    payment_id STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='print'
);

步驟 2:編寫 Interval Join SQL

INSERT INTO order_payment_result
SELECT
    o.order_id, 
    p.payment_id, 
    o.amount
FROM orders AS o
JOIN payments AS p
ON o.order_id = p.order_id
AND o.event_time BETWEEN p.event_time -INTERVAL'5'MINUTEAND p.event_time +INTERVAL'5'MINUTE;

步驟 3:準備測試數據

訂單流數據(orders 主題):

{"order_id":"order_1","user_id":"101","amount":100.50,"event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","user_id":"102","amount":200.00,"event_time":"2023-10-01 10:05:00"}  -- 10:05:00 產生,需匹配 10:00:00-10:10:00 的支付記錄
{"order_id":"order_3","user_id":"103","amount":300.00,"event_time":"2023-10-01 10:15:00"}  -- 10:15:00 產生,需匹配 10:10:00-10:20:00 的支付記錄

支付流數據(payments 主題):

{"payment_id": "pay_1", "order_id": "order_1", "amount": 100.50, "event_time": "2023-10-01 10:03:00"}  -- 匹配 order_1(10:00:00 ±5 分鐘內)
{"payment_id": "pay_2", "order_id": "order_2", "amount": 200.00, "event_time": "2023-10-01 10:12:00"}  -- 匹配 order_2(10:05:00 ±5 分鐘內,10:12:00 在 10:00:00-10:10:00 外?不,區間是 o.event_time BETWEEN p.event_time -5min AND p.event_time +5min,即 p.event_time -5min ≤ o.event_time ≤ p.event_time +5min → 10:07:00 ≤ o.event_time ≤ 10:17:00,order_2 的 10:05:00 不在此區間!需調整區間定義)

修正時間區間邏輯:原 SQL 中 o.event_time BETWEEN p.event_time -5min AND p.event_time +5min 表示“訂單時間在支付時間 ±5 分鐘內”,但實際需求是“支付時間在訂單時間 +5 分鐘內”。因此需調整為:

AND p.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '5' MINUTE;  -- 支付時間在訂單時間后 5 分鐘內

重新測試支付數據:

{"payment_id":"pay_1","order_id":"order_1","amount":100.50,"event_time":"2023-10-01 10:03:00"}  -- 10:03:00 在 order_1 的 10:00:00-10:05:00 內,匹配
{"payment_id":"pay_2","order_id":"order_2","amount":200.00,"event_time":"2023-10-01 10:08:00"}  -- 10:08:00 在 order_2 的 10:05:00-10:10:00 內,匹配
{"payment_id":"pay_3","order_id":"order_3","amount":300.00,"event_time":"2023-10-01 10:21:00"}  -- 10:21:00 超出 order_3 的 10:15:00-10:20:00,不匹配

步驟 4:執行結果分析

Interval Join 結果:

+I[order_1, pay_1, 100.50]  -- order_1 與 pay_1 匹配(10:03:00 在 10:00:00-10:05:00 內)
+I[order_2, pay_2, 200.00]  -- order_2 與 pay_2 匹配(10:08:00 在 10:05:00-10:10:00 內)

關鍵觀察:

  • order_3 的支付記錄 pay_3 因超時(10:21:00 > 10:15:00 +5 分鐘)未匹配,不會出現在結果中。
  • 狀態清理:當 Watermark 超過 order_1.event_time +5分鐘(即 10:05:00)時,order_1 的數據會從狀態中刪除,不再參與后續匹配。

6. 注意事項

  • 僅支持事件時間:Interval Join 必須基于事件時間,需在表 DDL 中定義 WATERMARK。
  • 時間區間方向:需明確是“左流時間在右流時間區間內”還是“右流時間在左流時間區間內”,避免邏輯錯誤。
  • 不支持 OUTER JOIN:Flink SQL 的 Interval Join 僅支持 INNER JOIN,不支持 LEFT/RIGHT JOIN(因為時間區間限制可能導致一端數據無法匹配,而 OUTER JOIN 需輸出未匹配數據,與狀態清理機制沖突)。
  • 數據亂序影響:若數據亂序嚴重(如 Watermark 延遲較大),可能導致已匹配的數據因狀態清理而無法關聯,需合理設置 Watermark 延遲(如 WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE)。

四、Temporal Join:時態 Join

1. 定義與核心思想

Temporal Join 是基于時間版本的流與維度表 Join,用于關聯流表(如訂單流)和 changelog 流(如用戶維度表的變更記錄),獲取流表數據發生時間點的維度表版本。語法上通過 FOR SYSTEM_TIME AS OF 指定時間點。

核心特點:

  • 時間版本匹配:根據流表的時間戳(事件時間/處理時間),匹配維度表在對應時間點的版本(而非最新版本)。
  • 支持 changelog 流:維度表必須是 changelog 流(包含 INSERT、UPDATE、DELETE),通常通過 CDC(Change Data Capture)工具(如 Debezium)同步數據庫變更。
  • 主鍵要求:維度表必須定義主鍵(PRIMARY KEY),用于唯一標識不同版本的數據。

2. 適用場景

Temporal Join 適用于需要歷史版本數據關聯的場景,例如:

訂單關聯用戶歷史版本:訂單產生時,用戶可能是“普通會員”,后續升級為“VIP”,需關聯訂單發生時的用戶等級(而非當前等級)。

審計日志分析:關聯操作日志與配置表的歷史版本,還原操作發生時的配置信息。

3. 實現原理

Temporal Join 的底層基于 KeyedCoProcessFunction 和版本狀態管理,核心流程如下:

  • 版本狀態存儲:維度表(changelog 流)的數據以 MapState 存儲,Key 為主鍵,Value 為“時間戳 → 數據”的映射(即每個主鍵對應多個版本的數據,按時間戳排序)。
  • 時間點匹配:當流表收到數據 s(時間戳為 t)時,從維度表狀態中查詢主鍵對應的版本列表,找到時間戳 ≤ t 的最新版本(即 max(version_time) ≤ t)。
  • 結果生成:將流表數據 s 與匹配的維度表版本數據關聯,輸出結果。
  • 版本清理:通過 State TTL 清理過期的版本數據(如僅保留最近 7 天的版本)。

4. 語法說明

Temporal Join 的語法需在 Join 條件中添加 FOR SYSTEM_TIME AS OF:

SELECT 
    o.order_id, 
    u.user_name, 
    u.level  -- 用戶等級(歷史版本)
FROM orders AS o
JOIN users FOR SYSTEM_TIME AS OF o.event_time AS u  -- 關聯訂單發生時間點的用戶版本
ON o.user_id = u.user_id;

關鍵語法點:

  • FOR SYSTEM_TIME AS OF o.event_time:指定以流表 orders 的 event_time 為時間點,匹配維度表 users 的版本。
  • 支持事件時間(o.event_time)和處理時間(PROCTIME()),事件時間更常用(需定義 WATERMARK)。

5. 詳細樣例代碼

(1) 場景描述

關聯訂單流(orders)和用戶維度表(users_cdc),獲取訂單發生時用戶的等級(level)。用戶維度表通過 CDC 同步 MySQL 的變更(包含用戶等級更新)。

(2) 環境準備

  • 依賴:flink-connector-mysql-cdc(用于 MySQL CDC 源)
  • 數據源:訂單流(Kafka)、用戶維度表(MySQL CDC)。

步驟 1:創建訂單流表(Kafka)

CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='temporal_join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

步驟 2:創建用戶維度表(MySQL CDC)

CREATE TABLE users_cdc (
    user_id STRING,
    user_name STRING,
    level STRING,  -- 用戶等級(普通、VIP)
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time -INTERVAL'5'SECOND,
    PRIMARY KEY (user_id) NOT ENFORCED  -- 主鍵(必須定義)
) WITH (
    'connector'='mysql-cdc',
    'hostname'='localhost',
    'port'='3306',
    'username'='root',
    'password'='123456',
    'database-name'='flink_test',
    'table-name'='users',
    'server-time-zone'='Asia/Shanghai'
);

步驟 3:編寫 Temporal Join SQL

-- 結果表(Print 輸出)
CREATE TABLE order_user_level_result (
    order_id STRING,
    user_name STRING,
    level STRING,
    order_time TIMESTAMP(3)
) WITH (
    'connector'='print'
);

-- Temporal Join:關聯訂單發生時間點的用戶版本
INSERT INTO order_user_level_result
SELECT
    o.order_id, 
    u.user_name, 
    u.level,
    o.event_time AS order_time
FROM orders AS o
JOIN users_cdc FORSYSTEM_TIMEASOF o.event_time AS u
ON o.user_id = u.user_id;

步驟 4:準備測試數據

MySQL 用戶表初始數據:

-- flink_test.users 表
+---------+------------+--------+---------------------+
| user_id | user_name  | level  | update_time         |
+---------+------------+--------+---------------------+
| 101     | Alice      | 普通   | 2023-10-01 09:00:00 |
| 102     | Bob        | VIP    | 2023-10-01 09:00:00 |
+---------+------------+--------+---------------------+

訂單流數據(orders 主題):

{"order_id": "order_1", "user_id": "101", "amount": 100.50, "event_time": "2023-10-01 10:00:00"}  -- 10:00:00 的訂單,用戶 101 此時等級為“普通”
{"order_id": "order_2", "user_id": "102", "amount": 200.00, "event_time": "2023-10-01 10:05:00"}  -- 10:05:00 的訂單,用戶 102 此時等級為“VIP”

MySQL 用戶表更新數據(模擬 CDC 變更):

-- 2023-10-01 10:02:00,用戶 101 升級為 VIP
UPDATE users SET level = 'VIP', update_time = '2023-10-01 10:02:00' WHERE user_id = '101';

步驟 5:執行結果分析

Temporal Join 結果:

+I[order_1, Alice, 普通, 2023-10-01 10:00:00]  -- order_1 發生時(10:00:00),用戶 101 等級為“普通”(10:02:00 的升級不影響)
+I[order_2, Bob, VIP, 2023-10-01 10:05:00]      -- order_2 發生時(10:05:00),用戶 102 等級為“VIP”

關鍵觀察:

  • order_1 關聯的是用戶 101 在 10:00:00 的版本(“普通”),即使 10:02:00 升級為“VIP”,也不會影響歷史訂單的關聯結果。
  • 維度表的狀態存儲了用戶 101 的兩個版本(09:00:00 的“普通”和 10:02:00 的“VIP”),通過 FOR SYSTEM_TIME AS OF 匹配對應時間點的版本。

6. 注意事項

  • 維度表主鍵:必須定義 PRIMARY KEY,否則無法區分不同版本的數據。
  • CDC 數據格式:維度表需是 changelog 流(如 MySQL CDC、Postgres CDC),支持 INSERT、UPDATE、DELETE 操作。
  • 時間類型選擇:優先使用事件時間(FOR SYSTEM_TIME AS OF o.event_time),處理時間(FOR SYSTEM_TIME AS OF PROCTIME())會關聯當前最新的維度表版本,無法獲取歷史版本。
  • 狀態管理:維度表的版本狀態會隨時間增長,需配置 State TTL(如 'changelog.state.ttl' = '7d')清理過期版本。

五、Lookup Join:維表 Join

1. 定義與核心思想

Lookup Join 是流表與外部維表(如 MySQL、Redis、HBase)的實時查詢 Join,當流表收到數據時,通過 Join Key 查詢外部維表,獲取關聯數據。語法上通過 FOR SYSTEM_TIME AS OF 標記(與 Temporal Join 類似,但底層實現不同)。

核心特點:

  • 外部存儲依賴:維表存儲在外部系統(非 Flink 狀態),如 MySQL(關系型數據庫)、Redis(緩存)、HBase(列式存儲)。
  • 實時查詢:流表每條數據都會觸發一次外部維表的查詢(同步或異步),無需預先加載維表數據。
  • 無狀態存儲:Flink 不存儲維表數據(僅緩存查詢結果),適用于維表數據量大或頻繁變更的場景。

2. 適用場景

Lookup Join 適用于維表數據量大、需實時查詢的場景,例如:

  • 訂單關聯商品信息:商品信息存儲在 MySQL 中(百萬級數據),訂單流實時查詢 MySQL 獲取商品名稱、價格。
  • 實時風控:規則存儲在 Redis 中,用戶行為流實時查詢 Redis 獲取風控規則(規則頻繁更新)。

3. 實現原理

Lookup Join 的底層基于 LookupFunction(同步)或 AsyncLookupFunction(異步),核心流程如下:

(1) 流表數據觸發:當流表收到數據 s 時,提取 Join Key(如 product_id)。

(2) 外部維表查詢:

  • 同步查詢:調用 LookupFunction 的 lookup 方法,阻塞式查詢外部維表(如 MySQL 的 SELECT * FROM products WHERE product_id = ?),返回關聯數據。
  • 異步查詢:調用 AsyncLookupFunction 的 asyncLookup 方法,通過異步 IO(如 Netty)查詢外部維表,避免阻塞流處理線程(推薦,吞吐量更高)。
  • 3結果生成:將流表數據 s 與查詢到的維表數據關聯,輸出結果(若維表無匹配數據,根據 Join 類型輸出 NULL 或跳過)。
  • 緩存優化(可選):為減少外部存儲壓力,可在 LookupFunction 中添加緩存(如 Guava Cache),緩存查詢結果(需設置 TTL 和大小限制)。

4. 語法說明

Lookup Join 的語法與 Temporal Join 類似,但維表是外部存儲的連接器(如 JDBC、Redis):

SELECT 
    o.order_id, 
    p.product_name, 
    o.amount
FROM orders AS o
JOIN products FOR SYSTEM_TIME AS OF o.event_time AS p  -- 關聯外部維表 products
ON o.product_id = p.product_id;

關鍵語法點:

  • FOR SYSTEM_TIME AS OF o.event_time:標記為 Lookup Join(語義上表示“查詢當前時間點的維表數據”,實際取決于外部存儲的查詢邏輯)。
  • 維表連接器需支持 Lookup 功能(如 JDBC 連接器的 'lookup.cache.max-rows' 參數)。

5. 詳細樣例代碼

(1) 場景描述

關聯訂單流(orders)和 MySQL 維表(products),實時查詢商品名稱(product_name)。訂單流數據量較大(每秒 1000 條),商品維表數據量 10 萬條,需異步查詢優化性能。

(2) 環境準備

? 依賴:flink-connector-jdbc(用于 JDBC 維表連接)

? 數據源:訂單流(Kafka)、商品維表(MySQL)。

步驟 1:創建訂單流表(Kafka)

CREATE TABLE orders (
    order_id STRING,
    product_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'properties.group.id'='lookup_join_group',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

步驟 2:創建商品維表(MySQL JDBC)

CREATE TABLE products (
    product_id STRING,
    product_name STRING,
    price DECIMAL(10, 2),
    PRIMARY KEY (product_id) NOT ENFORCED  -- 主鍵(用于查詢)
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://localhost:3306/flink_test',
    'username'='root',
    'password'='123456',
    'table-name'='products',
    'lookup.cache.max-rows'='10000',  -- 緩存最大行數
    'lookup.cache.ttl'='1h',          -- 緩存 TTL(1 小時)
    'lookup.max-retries'='3'           -- 查詢失敗重試次數
);

步驟 3:編寫 Lookup Join SQL

-- 結果表(Print 輸出)
CREATE TABLE order_product_result (
    order_id STRING,
    product_name STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='print'
);

-- Lookup Join:關聯 MySQL 商品維表
INSERT INTO order_product_result
SELECT
    o.order_id, 
    p.product_name, 
    o.amount
FROM orders AS o
JOIN products FORSYSTEM_TIMEASOF o.event_time AS p
ON o.product_id = p.product_id;

步驟 4:準備測試數據

MySQL 商品維表數據:

-- flink_test.products 表
+------------+--------------+-------+
| product_id | product_name | price |
+------------+--------------+-------+
| P001       | 手機         | 5999  |
| P002       | 電腦         | 8999  |
+------------+--------------+-------+

訂單流數據(orders 主題):

{"order_id":"order_1","product_id":"P001","amount":5999.00,"event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","product_id":"P002","amount":8999.00,"event_time":"2023-10-01 10:00:01"}
{"order_id":"order_3","product_id":"P003","amount":100.00,"event_time":"2023-10-01 10:00:02"}  -- P003 不存在,LEFT JOIN 時 product_name 為 NULL

步驟 5:執行結果分析

Lookup Join 結果(INNER JOIN):

+I[order_1, 手機, 5999.00]  -- P001 查詢成功,關聯商品名稱“手機”
+I[order_2, 電腦, 8999.00]  -- P002 查詢成功,關聯商品名稱“電腦”

若改為 LEFT JOIN:

INSERT INTO order_product_result
SELECT 
    o.order_id, 
    p.product_name, 
    o.amount
FROM orders AS o
LEFT JOIN products FOR SYSTEM_TIME AS OF o.event_time AS p
ON o.product_id = p.product_id;

結果:

+I[order_1, 手機, 5999.00]
+I[order_2, 電腦, 8999.00]
+I[order_3, NULL, 100.00]  -- P003 在維表中不存在,product_name 為 NULL

步驟 6:異步查詢優化(Java API 示例)

若需更高性能,可通過 Java API 實現異步 Lookup Join(Flink SQL 默認同步,需自定義連接器):

// 自定義異步 Lookup Function(簡化示例)
publicclassAsyncMySQLLookupFunctionextendsAsyncTableFunction<Row> {

    privatefinal String url;
    privatefinal String username;
    privatefinal String password;
    privatefinal String tableName;
    private ExecutorService executorService;

    publicAsyncMySQLLookupFunction(String url, String username, String password, String tableName) {
        this.url = url;
        this.username = username;
        this.password = password;
        this.tableName = tableName;
    }

    @Override
    publicvoidopen(FunctionContext context)throws Exception {
        executorService = Executors.newFixedThreadPool(10);  // 異步線程池
    }

    publicvoideval(CompletableFuture<Collection<Row>> future, Object... keys) {
        StringproductId= (String) keys[0];
        executorService.submit(() -> {
            try (Connectionconn= DriverManager.getConnection(url, username, password);
                 PreparedStatementstmt= conn.prepareStatement(
                     "SELECT product_name, price FROM " + tableName + " WHERE product_id = ?")) {
                stmt.setString(1, productId);
                ResultSetrs= stmt.executeQuery();
                if (rs.next()) {
                    StringproductName= rs.getString("product_name");
                    doubleprice= rs.getDouble("price");
                    future.complete(Collections.singletonList(Row.of(productName, price)));
                } else {
                    future.complete(Collections.emptyList());  // 無匹配數據
                }
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        });
    }

    @Override
    publicvoidclose()throws Exception {
        executorService.shutdown();
    }
}

6. 注意事項

(11) 異步 IO 優化:同步查詢會阻塞流處理線程,導致吞吐量下降,建議優先使用異步 IO(需外部存儲支持異步客戶端,如 MySQL 的異步驅動)。

(2) 緩存配置:合理設置 'lookup.cache.max-rows' 和 'lookup.cache.ttl',避免緩存穿透(大量查詢未緩存的數據)或緩存擊穿(緩存過期瞬間大量請求直達數據庫)。

(3) 維表更新延遲:Lookup Join 查詢的是外部維表的實時數據,若維表更新延遲(如 MySQL 主從同步延遲),可能導致關聯結果不準確。

(4) Join 類型選擇:

  • INNER JOIN:僅輸出維表匹配的數據,適用于強關聯場景(如訂單必須關聯商品)。
  • LEFT JOIN:輸出流表所有數據,維表不匹配時為 NULL,適用于弱關聯場景(如日志關聯維度信息)。

六、四種 Join 方式對比與選型

1. 核心特性對比

特性

Regular Join

Interval Join

Temporal Join

Lookup Join

Join 類型

雙流 Join

雙流 Join

流與 changelog 維表 Join

流與外部維表 Join

時間限制

有(時間區間)

有(時間點版本)

無(查詢實時數據)

狀態管理

存儲雙流所有數據(風險大)

存儲區間內數據(自動清理)

存儲維度表版本(需 TTL)

無狀態(依賴外部存儲)

支持 OUTER JOIN

是(INNER/LEFT/RIGHT/FULL)

否(僅 INNER)

是(LEFT/INNER)

是(LEFT/INNER)

適用數據量

小數據量

中大數據量

中等數據量(維度表)

大數據量(維表)

延遲

低(實時匹配)

低(實時匹配)

低(實時匹配)

中(依賴外部存儲查詢延遲)

典型場景

實時數據補全(允許延遲)

時間區間關聯(如訂單支付)

歷史版本關聯(如審計)

外部維表查詢(如商品信息)

2. 選型決策樹

選擇 Flink SQL Join 方式時,可按以下流程決策:

(1) 是否關聯外部維表(如 MySQL、Redis)?

  • 是 → 選擇 Lookup Join(需優化異步 IO 和緩存)。
  • 否 → 進入下一步。

(2) 是否需要關聯歷史版本數據(如訂單發生時的用戶等級)?

  • 是 → 選擇 Temporal Join(需維度表是 changelog 流,如 CDC)。
  • 否 → 進入下一步。

(3) 是否有嚴格的時間區間限制(如訂單 5 分鐘內支付)?

  • 是 → 選擇 Interval Join(需定義事件時間和 WATERMARK)。
  • 否 → 選擇 Regular Join(需配置 State TTL 避免狀態膨脹)。

3. 性能優化建議

(1) Regular Join:配置 'join.state.ttl' 清理過期狀態,避免 OOM;優先使用 INNER JOIN 減少狀態存儲。

(2) Interval Join:合理設置時間區間(避免過寬導致狀態膨脹,過窄導致數據遺漏);調整 Watermark 延遲(如 INTERVAL '1' MINUTE)平衡數據完整性和實時性。

(3) Temporal Join:為維度表配置 'changelog.state.ttl' 清理過期版本;使用事件時間(FOR SYSTEM_TIME AS OF o.event_time)而非處理時間,確保歷史版本關聯準確性。

(4) Lookup Join:啟用異步 IO('lookup.async' = 'true')提升吞吐量;配置緩存('lookup.cache.max-rows' 和 'lookup.cache.ttl')減少外部存儲壓力;選擇高性能外部存儲(如 Redis 替代 MySQL)。

(5) 通用優化:

  • KeyBy 選擇:確保 Join Key 分布均勻,避免數據傾斜(如 user_id 比隨機 ID 更好)。
  • 并行度調整:根據數據量和資源設置合理并行度(如 Kafka 分區數與 Flink 并行度一致)。
  • 監控與調優:通過 Flink Web UI 監控 State 大小、Checkpoint 時間、算子延遲等指標,動態調整參數。

七、實戰案例:電商實時數倉中的 Join 應用

1. 場景背景

某電商平臺需構建實時數倉,核心需求包括:

  • 訂單實時關聯用戶信息:獲取訂單發生時的用戶等級(歷史版本),用于用戶分群分析。
  • 訂單支付實時關聯:統計訂單產生后 5 分鐘內的支付成功率,監控支付轉化漏斗。
  • 訂單商品實時補全:關聯商品維表(MySQL),獲取商品名稱、類目等信息,用于實時大屏展示。

2. 技術選型

根據需求,結合四種 Join 方式的特點,選型如下:

  • 訂單關聯用戶等級:使用 Temporal Join(用戶表通過 CDC 同步,需歷史版本)。
  • 訂單支付關聯:使用 Interval Join(5 分鐘時間區間限制,自動清理狀態)。
  • 訂單商品補全:使用 Lookup Join(商品表數據量大,存儲在 MySQL,需實時查詢)。

3. 架構設計

Kafka(訂單流) → Flink SQL(Temporal Join + Interval Join + Lookup Join) → Kafka(結果層)
                     ↑
MySQL(用戶 CDC)   Redis(商品緩存,可選)

4. 核心代碼實現

步驟 1:創建源表

-- 訂單流(Kafka)
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    product_id STRING,
    amount DECIMAL(10, 2),
    order_status STRING,  -- CREATED, PAID, CANCELED
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='orders',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- 用戶 CDC 表(MySQL)
CREATE TABLE users_cdc (
    user_id STRING,
    user_name STRING,
    level STRING,
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time -INTERVAL'5'SECOND,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector'='mysql-cdc',
    'hostname'='localhost',
    'port'='3306',
    'username'='root',
    'password'='123456',
    'database-name'='flink_test',
    'table-name'='users'
);

-- 支付流(Kafka)
CREATE TABLE payments (
    payment_id STRING,
    order_id STRING,
    amount DECIMAL(10, 2),
    payment_status STRING,  -- SUCCESS, FAIL
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND
) WITH (
    'connector'='kafka',
    'topic'='payments',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- 商品維表(MySQL)
CREATE TABLE products (
    product_id STRING,
    product_name STRING,
    category STRING,
    price DECIMAL(10, 2),
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://localhost:3306/flink_test',
    'username'='root',
    'password'='123456',
    'table-name'='products',
    'lookup.cache.max-rows'='50000',
    'lookup.cache.ttl'='30min'
);

步驟 2:訂單關聯用戶等級(Temporal Join)

-- 訂單用戶關聯結果表(Kafka)
CREATE TABLE order_user_result (
    order_id STRING,
    user_id STRING,
    user_name STRING,
    level STRING,
    order_time TIMESTAMP(3)
) WITH (
    'connector'='kafka',
    'topic'='order_user_result',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- Temporal Join:關聯訂單發生時的用戶等級
INSERT INTO order_user_result
SELECT
    o.order_id,
    o.user_id,
    u.user_name,
    u.level,
    o.event_time AS order_time
FROM orders AS o
JOIN users_cdc FORSYSTEM_TIMEASOF o.event_time AS u
ON o.user_id = u.user_id;

步驟 3:訂單支付關聯(Interval Join)

-- 訂單支付關聯結果表(Kafka)
CREATE TABLE order_payment_result (
    order_id STRING,
    payment_id STRING,
    payment_status STRING,
    order_time TIMESTAMP(3),
    payment_time TIMESTAMP(3)
) WITH (
    'connector'='kafka',
    'topic'='order_payment_result',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- Interval Join:關聯訂單 5 分鐘內的支付記錄
INSERT INTO order_payment_result
SELECT
    o.order_id,
    p.payment_id,
    p.payment_status,
    o.event_time AS order_time,
    p.event_time AS payment_time
FROM orders AS o
JOIN payments AS p
ON o.order_id = p.order_id
AND p.event_time BETWEEN o.event_time AND o.event_time +INTERVAL'5'MINUTE;

步驟 4:訂單商品補全(Lookup Join)

-- 訂單商品關聯結果表(Kafka)
CREATE TABLE order_product_result (
    order_id STRING,
    product_id STRING,
    product_name STRING,
    category STRING,
    amount DECIMAL(10, 2)
) WITH (
    'connector'='kafka',
    'topic'='order_product_result',
    'properties.bootstrap.servers'='localhost:9092',
    'format'='json'
);

-- Lookup Join:關聯商品維表
INSERT INTO order_product_result
SELECT
    o.order_id,
    o.product_id,
    p.product_name,
    p.category,
    o.amount
FROM orders AS o
LEFTJOIN products FORSYSTEM_TIMEASOF o.event_time AS p
ON o.product_id = p.product_id;

5. 結果驗證與分析

(1) 測試數據

訂單流:

{"order_id":"order_1","user_id":"101","product_id":"P001","amount":5999.00,"order_status":"CREATED","event_time":"2023-10-01 10:00:00"}
{"order_id":"order_2","user_id":"102","product_id":"P002","amount":8999.00,"order_status":"CREATED","event_time":"2023-10-01 10:01:00"}

用戶 CDC(初始數據:101 為普通,102 為 VIP;10:02:00 101 升級為 VIP):

-- 初始
INSERT INTO users (user_id, user_name, level, update_time) VALUES ('101', 'Alice', '普通', '2023-10-01 09:00:00');
INSERT INTO users (user_id, user_name, level, update_time) VALUES ('102', 'Bob', 'VIP', '2023-10-01 09:00:00');
-- 升級
UPDATE users SET level = 'VIP', update_time = '2023-10-01 10:02:00' WHERE user_id = '101';

支付流:

{"payment_id":"pay_1","order_id":"order_1","amount":5999.00,"payment_status":"SUCCESS","event_time":"2023-10-01 10:03:00"}  -- order_1 的 5 分鐘內支付
{"payment_id":"pay_2","order_id":"order_2","amount":8999.00,"payment_status":"FAIL","event_time":"2023-10-01 10:07:00"}      -- order_2 的 5 分鐘內支付(10:01:00-10:06:00,10:07:00 超時,不匹配)

商品維表:

INSERT INTO products (product_id, product_name, category, price) VALUES ('P001', '手機', '數碼', 5999.00);
INSERT INTO products (product_id, product_name, category, price) VALUES ('P002', '電腦', '數碼', 8999.00);

(2) 結果分析

① 訂單用戶關聯(Temporal Join):

order_1: user_id=101, level=普通(10:00:00 的版本,升級不影響)
order_2: user_id=102, level=VIP(10:01:00 的版本)

符合預期,關聯了訂單發生時的用戶等級。

② 訂單支付關聯(Interval Join):

order_1: payment_id=pay_1, status=SUCCESS(10:03:00 在 10:00:00-10:05:00 內)
order_2: 無匹配(10:07:00 超出 10:01:00-10:06:00)

僅關聯了 5 分鐘內的支付記錄,超時支付被過濾。

③ 訂單商品補全(Lookup Join):

order_1: product_name=手機, category=數碼
order_2: product_name=電腦, category=數碼

實時查詢 MySQL 商品維表,成功補全商品信息。

八、總結與展望

Flink SQL 的四種 Join 方式覆蓋了流處理中所有常見的關聯場景:

  • Regular Join:通用雙流 Join,適用于對數據完整性要求高、允許延遲的小數據量場景。
  • Interval Join:基于時間區間的雙流 Join,解決時間關聯需求,自動清理狀態,適合大數據量。
  • Temporal Join:流與 changelog 維表的歷史版本 Join,滿足審計、歷史分析等場景。
  • Lookup Join:流與外部維表的實時查詢 Join,適用于維表數據量大、需實時更新的場景。
責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2014-12-25 09:41:15

Android加載方式

2024-03-20 15:33:12

2009-04-02 09:46:19

排名函數排序SQL 2005

2010-09-28 15:40:51

SQL刪除重復記錄

2023-10-21 21:13:00

索引SQL工具

2013-10-17 09:25:52

2021-12-22 09:34:01

Golagn配置方式

2020-11-10 10:08:41

Kubernetes容器開發

2020-06-24 07:49:13

Kubernetes場景網絡

2019-10-25 10:35:49

Java用法場景

2013-06-14 15:24:57

Android開發移動開發數據存儲方式

2010-08-05 09:33:08

Flex頁面跳轉

2020-06-12 08:28:29

JavaScript開發技術

2017-04-17 19:31:03

Android多線程

2010-07-28 13:54:42

Flex數據綁定

2022-03-25 14:47:24

Javascript數據類型開發

2023-05-22 08:03:28

JavaScrip枚舉定義

2021-08-02 14:37:36

鴻蒙HarmonyOS應用

2021-06-25 08:00:00

物聯網醫療技術

2013-05-13 09:48:47

網絡接入接入方法綜合布線
點贊
收藏

51CTO技術棧公眾號

色悠悠久久综合网| 亚洲精品在线视频观看| youjizz在线视频| 欧美精品羞羞答答| 欧美日韩一区二区三区视频| 亚洲免费久久| 丰满人妻妇伦又伦精品国产| 香蕉视频成人在线观看| 日韩在线视频一区| 在线观看亚洲免费视频| 性高爱久久久久久久久| 亚洲欧美日韩中文字幕一区二区三区 | 中文字幕在线免费| 国产一区二区女| 91精品国产乱码久久久久久久久| 人妻视频一区二区| 深夜福利一区二区三区| 色久综合一二码| 欧美交换配乱吟粗大25p| 免费一级毛片在线观看| 国产美女视频一区| 日韩免费中文字幕| 免费在线黄色片| 青青草国产成人a∨下载安卓| 日韩欧美你懂的| wwww.国产| 午夜av不卡| 一区二区三区欧美日| 欧美13一14另类| 亚洲黄色在线播放| 久久99精品久久久| 国产精品 欧美在线| 国产精品1000| 欧美日韩国产探花| 久久精品在线播放| 超碰人人人人人人人| 校花撩起jk露出白色内裤国产精品 | 亚洲一区网址| 这里只有精品99re| www.99在线| 欧美1级2级| 狠狠躁夜夜躁久久躁别揉| 欧美激情亚洲天堂| av片在线观看| 亚洲日本在线a| 亚洲一区二区三区在线观看视频| 青青草视频免费在线观看| 成人精品视频一区二区三区| 7777奇米亚洲综合久久| 97国产精品久久久| 久久成人免费电影| 国产综合视频在线观看| 亚洲视频在线观看免费视频| 日本中文一区二区三区| 国产精品免费一区豆花| 欧美一级黄视频| 男女男精品视频网| 国产欧美日韩亚洲精品| 亚洲综合精品在线| 老司机免费视频一区二区三区| 国产精品日韩在线播放| 中文字幕人妻一区二区在线视频| 日韩高清在线电影| 国产精品入口尤物| 97超碰人人草| 懂色中文一区二区在线播放| 亚洲精品日韩av| 午夜精品久久久久久久91蜜桃| 国产成人免费视频一区| 国产精品视频入口| 亚洲AV无码国产精品午夜字幕 | 朝桐光av一区二区三区| 日韩mv欧美mv国产网站| 亚洲欧美在线一区| 日韩精品电影一区二区三区| 日韩一区二区在线免费| 久久精品成人欧美大片古装| 久久久精品视频免费观看| 国产精品黄色| 欧美最猛性xxxxx(亚洲精品)| 久久精品无码av| 另类成人小视频在线| 91在线免费观看网站| 日韩在线观看视频网站| 久久人人超碰精品| 亚洲一区二区三区精品视频| 黄页网站大全在线免费观看| 欧美视频在线观看 亚洲欧| 青青青免费在线| 国产精品毛片久久久久久久久久99999999| 欧美日韩中字一区| 美国黄色一级视频| 精品国产aⅴ| 久久成人人人人精品欧| 青青草免费观看视频| 久久成人免费网站| 久久久99爱| 99福利在线| 色偷偷久久人人79超碰人人澡| wwwwwxxxx日本| 日日天天久久| 欧美成人一区在线| 在线观看你懂的网站| 高清shemale亚洲人妖| 日韩三级在线播放| 999福利在线视频| 欧美日韩国产成人在线91| 中国xxxx性xxxx产国| 欧美第一精品| 日本精品在线视频| 成人h动漫精品一区二区无码| 欧美极品少妇xxxxⅹ高跟鞋| 18禁网站免费无遮挡无码中文| 欧美一区二区三区婷婷| 日韩精品在线免费观看| 久久久精品国产sm调教| 免费成人美女在线观看| 久久一区二区精品| 里番在线播放| 欧美一区二区福利在线| 国产又黄又粗的视频| 野花国产精品入口| 亚洲综合精品一区二区| av在线天堂播放| 精品久久久精品| www.成人黄色| 日韩精品视频中文字幕| 日韩最新av在线| 一级成人黄色片| 国产成人精品免费看| 日韩中文字幕一区二区| 91黄页在线观看| 制服丝袜亚洲网站| 一区二区三区在线播放视频| 免播放器亚洲| 国产精品一级久久久| 久久精品视频免费看| 日本韩国欧美三级| 91精品蜜臀一区二区三区在线| 亚洲欧美综合在线精品| 爱情岛论坛vip永久入口| 高清一区二区三区| 久久综合免费视频影院| 最近中文在线观看| 国产欧美日韩激情| 97xxxxx| 高清一区二区三区| 欧美大学生性色视频| 一区二区国产欧美| 中文字幕中文在线不卡住| 亚洲色精品三区二区一区| 成人偷拍自拍| 欧美激情综合色| av男人天堂av| 国产精品国产三级国产aⅴ中文| 亚洲自偷自拍熟女另类| 久久夜色精品国产噜噜av小说| 欧美乱人伦中文字幕在线| 国产精品伊人久久| 亚洲欧洲成人自拍| 妖精视频在线观看| 午夜久久99| 亚洲xxx大片| 97影院秋霞午夜在线观看| 欧美日韩国产a| 日韩中文字幕电影| 日韩精品1区2区3区| 日本10禁啪啪无遮挡免费一区二区| av影视在线| 亚洲精品成人久久电影| 欧美福利视频一区二区| 91蝌蚪porny九色| 欧美 日韩精品| 成人在线丰满少妇av| 国产精品免费福利| 国产网友自拍视频导航网站在线观看| 日韩视频123| 久久精品性爱视频| 91天堂素人约啪| 欧美日韩亚洲一二三| 欧美久久综合网| 2019国产精品自在线拍国产不卡| 你懂的视频在线| 欧美综合天天夜夜久久| 青青青视频在线免费观看| 狠狠久久亚洲欧美| 男人添女荫道口喷水视频| 亚洲涩涩av| 国产精品免费电影| av在线影院| 亚洲国产精品人人爽夜夜爽| 凹凸精品一区二区三区| 亚洲精品一二三区| 一卡二卡三卡四卡| 国产裸体歌舞团一区二区| 国产精品丝袜久久久久久消防器材| 日韩黄色大片| 高清国产在线一区| 欧美日一区二区三区| 欧美成人一二三| 男女视频在线观看免费| 日韩限制级电影在线观看| 亚欧视频在线观看| 国产精品久久久久影院亚瑟| 韩国三级在线看| 亚洲一区日韩| 国产 国语对白 露脸| 台湾佬综合网| 成人精品在线视频| 激情aⅴ欧美一区二区欲海潮| 神马久久桃色视频| 日韩专区第一页| 欧美美女视频在线观看| 久久夜色精品亚洲| 亚洲欧美综合在线精品| 一区二区黄色片| 国产成人免费视| 一级在线免费视频| 99伊人成综合| 中文字幕色一区二区| 国产乱码精品一区二区三区四区| 99久久无色码| 国产91精品在线| 97热在线精品视频在线观看| 在线免费黄色| 日韩精品免费在线视频观看| 国产三级三级在线观看| 色欧美片视频在线观看| 国产亚洲欧美精品久久久www| 亚洲三级免费电影| 日本理论中文字幕| av电影一区二区| 九九九久久久久久久| 一本久道久久综合婷婷鲸鱼| 国产一二三区在线播放| 欧美激情另类| 国模精品娜娜一二三区| 高清一区二区| 成人妇女免费播放久久久| 91福利区在线观看| 91国内在线视频| 欧美草逼视频| 久久精品最新地址| 福利小视频在线观看| 在线日韩精品视频| av女名字大全列表| 欧美精品一区二区久久久| 国产特级黄色片| 欧美精选午夜久久久乱码6080| 在线观看免费中文字幕| 欧美综合色免费| 中国女人一级一次看片| 日本电影亚洲天堂一区| 中文字幕乱伦视频| 在线观看亚洲专区| 亚洲成人av网址| 色94色欧美sute亚洲线路一ni| 国产污视频在线看| 精品二区三区线观看| 国产在线精品观看| 香蕉乱码成人久久天堂爱免费| 欧美交换国产一区内射| 性感美女极品91精品| 天堂网一区二区三区| 精品久久久久久亚洲国产300| 日本熟妇毛耸耸xxxxxx| 在线精品国精品国产尤物884a| 欧美国产一级片| 欧美精品一卡两卡| va婷婷在线免费观看| 亚洲第一网站免费视频| 欧美一区二区在线观看视频| 亚洲精品www久久久久久广东| 亚洲AV第二区国产精品| 亚洲精品456在线播放狼人| 欧美女优在线观看| 中文字幕9999| 黄色精品在线观看| 欧美激情中文字幕乱码免费| 中文字幕人成乱码在线观看| 日韩av理论片| 国语自产精品视频在线看抢先版结局 | 天天躁日日躁狠狠躁av麻豆男男| 不卡一区二区三区四区| 日韩高清一二三区| 国产婷婷色一区二区三区| 欧美午夜激情影院| 伊人夜夜躁av伊人久久| 日韩精品视频免费看| 欧美日韩在线电影| 亚洲国产av一区二区| 国产视频精品va久久久久久| 国产一二在线观看| 欧美日韩成人精品| 第84页国产精品| 国产一区红桃视频| 激情小说亚洲色图| 国产高清精品软男同| 在线观看的日韩av| 少妇黄色一级片| 粉嫩久久99精品久久久久久夜| 久久久精品成人| 亚洲一区二区三区国产| 久久久久久在线观看| 制服丝袜av成人在线看| 毛片网站在线观看| 欧美成人精品三级在线观看| 三级在线观看视频| 91色中文字幕| 日本黄色精品| 黄网站欧美内射| 韩国一区二区在线观看| 久久久国产精品无码| 亚洲天堂a在线| 日日夜夜狠狠操| 欧美大黄免费观看| 青青色在线视频| 91精品国产91久久| 国内精品视频| 日本婷婷久久久久久久久一区二区 | 精品99一区二区三区| 国产中文在线| 久久全国免费视频| 日韩三级成人| 日韩欧美视频一区二区三区四区| 欧美在线三级| 日韩不卡一二三| 久久色视频免费观看| 色网站在线播放| 欧美一二三四在线| 成黄免费在线| 51午夜精品视频| 丝袜久久网站| 国产人妻777人伦精品hd| 国产精品一色哟哟哟| 成人在线观看高清| 欧美三级午夜理伦三级中视频| 亚洲 精品 综合 精品 自拍| 欧美噜噜久久久xxx| 亚洲精品18| 91免费视频黄| 久久精品国产亚洲aⅴ| 亚洲精品乱码久久久久久久| 午夜av电影一区| 懂色av一区二区三区四区| 久久久国产一区二区| 国产精选久久| 制服诱惑一区| 久久国产精品99久久人人澡| 国产视频不卡在线| 欧美美女一区二区| 久久日韩视频| 91色在线视频| 国精品一区二区三区| 日本女人黄色片| 一区二区三区蜜桃网| 人妻视频一区二区三区| 国模私拍一区二区三区| 国产精品videossex| 久草视频国产在线| 91污在线观看| 潘金莲一级淫片aaaaaa播放| 亚洲人成电影在线播放| 成人在线观看免费视频| 先锋影音日韩| 久久av中文字幕片| 国产无遮挡在线观看| 欧美一区在线视频| 亚洲淫性视频| 国产精华一区| 新狼窝色av性久久久久久| 中文字幕丰满孑伦无码专区| 欧美日韩午夜剧场| 五十路在线视频| 日本三级久久久| 九色精品91| 涩多多在线观看| 亚洲制服丝袜av| 午夜视频福利在线| 国产美女被下药99| 国产精品成久久久久| 成年人看片网站| 狠狠操狠狠色综合网| 国产天堂素人系列在线视频| 国产拍精品一二三| 久久精品国产大片免费观看| 国产大学生av| 欧美视频裸体精品| 永久av在线| 国产成人一区二区三区免费看| 亚洲精品乱码久久久久久蜜桃麻豆| theav精尽人亡av| 欧美精品在线观看播放| 美女日批视频在线观看| 久久综合福利| 免费观看久久久4p| 国产精品18p| 中文字幕日韩精品在线观看| 日韩高清在线观看一区二区| 国内自拍视频网|