精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Apache Flink 漫談系列(09) - JOIN 算子

開發 開發工具
本篇將詳盡的為大家介紹傳統數據庫為什么需要JOIN算子,以及JOIN算子在Apache Flink中的底層實現原理和在實際使用中的優化!

聊什么

在《Apache Flink 漫談系列 - SQL概覽》中我們介紹了JOIN算子的語義和基本的使用方式,介紹過程中大家發現Apache Flink在語法語義上是遵循ANSI-SQL標準的,那么再深思一下傳統數據庫為啥需要有JOIN算子呢?在實現原理上面Apache Flink內部實現和傳統數據庫有什么區別呢?本篇將詳盡的為大家介紹傳統數據庫為什么需要JOIN算子,以及JOIN算子在Apache Flink中的底層實現原理和在實際使用中的優化!

什么是JOIN

在《Apache Flink 漫談系列 - SQL概覽》中我對JOIN算子有過簡單的介紹,這里我們以具體實例的方式讓大家對JOIN算子加深印象。JOIN的本質是分別從N(N>=1)張表中獲取不同的字段,進而得到最完整的記錄行。比如我們有一個查詢需求:在學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)中查詢所有學生的姓名,課程名和考試分數。如下:

為啥需要JOIN

JOIN的本質是數據拼接,那么如果我們將所有數據列存儲在一張大表中,是不是就不需要JOIN了呢?如果真的能將所需的數據都在一張表存儲,我想就真的不需要JOIN的算子了,但現實業務中真的能做到將所需數據放到同一張大表里面嗎?答案是否定的,核心原因有2個:

(1)產生數據的源頭可能不是一個系統;

(2)產生數據的源頭是同一個系統,但是數據冗余的沉重代價,迫使我們會遵循數據庫范式,進行表的設計。簡說NF如下:

  • 1NF - 列不可再分;
  • 2NF - 符合1NF,并且非主鍵屬性全部依賴于主鍵屬性;
  • 3NF - 符合2NF,并且消除傳遞依賴,即:任何字段不能由其他字段派生出來;
  • BCNF - 符合3NF,并且主鍵屬性之間無依賴關系。

當然還有 4NF,5NF,不過在實際的數據庫設計過程中做到BCNF已經足夠了!(并非否定4NF,5NF存在的意義,只是個人還沒有遇到一定要用4NF,5NF的場景,設計往往會按存儲成本,查詢性能等綜合因素考量)

JOIN種類

JOIN 在傳統數據庫中有如下分類:

(1)CROSS JOIN - 交叉連接,計算笛卡兒積;

(2)INNER JOIN - 內連接,返回滿足條件的記錄;

(3)OUTER JOIN

  • LEFT - 返回左表所有行,右表不存在補NULL;
  • RIGHT - 返回右表所有行,左邊不存在補NULL;
  • FULL - 返回左表和右表的并集,不存在一邊補NULL;

(4)SELF JOIN - 自連接,將表查詢時候命名不同的別名。

JOIN語法

JOIN 在SQL89和SQL92中有不同的語法,以INNER JOIN為例說明:

  • SQL89 - 表之間用“,”逗號分割,鏈接條件和過濾條件都在Where子句指定:
    1. SELECT 
    2.   a.colA, 
    3.   b.colA 
    4. FROM  
    5.   tab1 AS a , tab2 AS b 
    6. WHERE a.id = b.id and a.other > b.other 
  • SQL92 - SQL92將鏈接條件在ON子句指定,過濾條件在WHERE子句指定,邏輯更為清晰:
    1. SELECT 
    2.   a.colA, 
    3.   b.colA 
    4. FROM 
    5.   tab1 AS a JOIN tab2 AS b ON a.id = b.id 
    6. WHERE 
    7.   a.other > b.other 

本篇中的后續示例將應用SQL92語法進行SQL的編寫,語法如下:

  1. tableExpression [ LEFT|RIGHT|FULL|INNER|SELF ] JOIN tableExpression [ ON joinCondition ] [WHERE filterCondition] 

語義示例說明

在《Apache Flink 漫談系列 - SQL概覽》中對JOIN語義有過簡單介紹,這里會進行展開介紹。 我們以開篇示例中的三張表:學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)來介紹各種JOIN的語義。

1. CROSS JOIN

交叉連接會對兩個表進行笛卡爾積,也就是LEFT表的每一行和RIGHT表的所有行進行聯接,因此生成結果表的行數是兩個表行數的乘積,如student和course表的CROSS JOIN結果如下:

  1. mysql> SELECT * FROM student JOIN course; 
  2. +------+-------+------+-----+-------+--------+ 
  3. | no   | name  | sex  | no  | name  | credit | 
  4. +------+-------+------+-----+-------+--------+ 
  5. | S001 | Sunny | M    | C01 | Java  |      2 | 
  6. | S002 | Tom   | F    | C01 | Java  |      2 | 
  7. | S003 | Kevin | M    | C01 | Java  |      2 | 
  8. | S001 | Sunny | M    | C02 | Blink |      3 | 
  9. | S002 | Tom   | F    | C02 | Blink |      3 | 
  10. | S003 | Kevin | M    | C02 | Blink |      3 | 
  11. | S001 | Sunny | M    | C03 | Spark |      3 | 
  12. | S002 | Tom   | F    | C03 | Spark |      3 | 
  13. | S003 | Kevin | M    | C03 | Spark |      3 | 
  14. +------+-------+------+-----+-------+--------+ 
  15. 9 rows in set (0.00 sec) 

如上結果我們得到9行=student(3) x course(3)。交叉聯接一般會消耗較大的資源,也被很多用戶質疑交叉聯接存在的意義?(任何時候我們都有質疑的權利,同時也建議我們養成自己質疑自己“質疑”的習慣,就像小時候不理解父母的“廢話”一樣)。

我們以開篇的示例說明交叉聯接的巧妙之一,開篇中我們的查詢需求是:在學生表(學號,姓名,性別),課程表(課程號,課程名,學分)和成績表(學號,課程號,分數)中查詢所有學生的姓名,課程名和考試分數。開篇中的SQL語句得到的結果如下:

  1. mysql> SELECT 
  2.     ->   student.name, course.name, score 
  3.     -> FROM student JOIN  score ON student.no = score.s_no 
  4.     ->              JOIN course ON score.c_no = course.no; 
  5. +-------+-------+-------+ 
  6. | name  | name  | score | 
  7. +-------+-------+-------+ 
  8. | Sunny | Java  |    80 | 
  9. | Sunny | Blink |    98 | 
  10. | Sunny | Spark |    76 | 
  11. | Kevin | Java  |    78 | 
  12. | Kevin | Blink |    88 | 
  13. | Kevin | Spark |    68 | 
  14. +-------+-------+-------+ 
  15. 6 rows in set (0.00 sec) 

如上INNER JOIN的結果我們發現少了Tom同學的成績,原因是Tom同學沒有參加考試,在score表中沒有Tom的成績,但是我們可能希望雖然Tom沒有參加考試但仍然希望Tom的成績能夠在查詢結果中顯示(成績 0 分),面對這樣的需求,我們怎么處理呢?交叉聯接可以幫助我們:

  • ***步 student和course 進行交叉聯接:
    1. mysql> SELECT 
    2.     ->   stu.no, c.no, stu.name, c.name 
    3.     -> FROM student stu JOIN course c  笛卡爾積 
    4.     -> ORDER BY stu.no; -- 排序只是方便大家查看:) 
    5. +------+-----+-------+-------+ 
    6. | no   | no  | name  | name  | 
    7. +------+-----+-------+-------+ 
    8. | S001 | C03 | Sunny | Spark | 
    9. | S001 | C01 | Sunny | Java  | 
    10. | S001 | C02 | Sunny | Blink | 
    11. | S002 | C03 | Tom   | Spark | 
    12. | S002 | C01 | Tom   | Java  | 
    13. | S002 | C02 | Tom   | Blink | 
    14. | S003 | C02 | Kevin | Blink | 
    15. | S003 | C03 | Kevin | Spark | 
    16. | S003 | C01 | Kevin | Java  | 
    17. +------+-----+-------+-------+ 
    18. 9 rows in set (0.00 sec) 
  • 第二步 將交叉聯接的結果與score表進行左外聯接,如下:
    1. mysql> SELECT 
    2.     ->   stu.no, c.no, stu.name, c.name, 
    3.     ->    CASE 
    4.     ->     WHEN s.score IS NULL THEN 0 
    5.     ->     ELSE s.score 
    6.     ->   END AS score 
    7.     -> FROM student stu JOIN course c  -- 迪卡爾積 
    8.     -> LEFT JOIN score s ON sstu.no = s.s_no and c.no = s.c_no -- LEFT OUTER JOIN 
    9.     -> ORDER BY stu.no; -- 排序只是為了大家好看一點:) 
    10. +------+-----+-------+-------+-------+ 
    11. | no   | no  | name  | name  | score | 
    12. +------+-----+-------+-------+-------+ 
    13. | S001 | C03 | Sunny | Spark |    76 | 
    14. | S001 | C01 | Sunny | Java  |    80 | 
    15. | S001 | C02 | Sunny | Blink |    98 | 
    16. | S002 | C02 | Tom   | Blink |     0 | -- TOM 雖然沒有參加考試,但是仍然看到他的信息 
    17. | S002 | C03 | Tom   | Spark |     0 | 
    18. | S002 | C01 | Tom   | Java  |     0 | 
    19. | S003 | C02 | Kevin | Blink |    88 | 
    20. | S003 | C03 | Kevin | Spark |    68 | 
    21. | S003 | C01 | Kevin | Java  |    78 | 
    22. +------+-----+-------+-------+-------+ 
    23. 9 rows in set (0.00 sec) 

經過CROSS JOIN幫我們將Tom的信息也查詢出來了!(TOM 雖然沒有參加考試,但是仍然看到他的信息)

2. INNER JOIN

內聯接在SQL92中 ON 表示聯接添加,可選的WHERE子句表示過濾條件,如開篇的示例就是一個多表的內聯接,我們在看一個簡單的示例: 查詢成績大于80分的學生學號,學生姓名和成績:

  1. mysql> SELECT 
  2.     ->   stu.no, stu.name , s.score 
  3.     -> FROM student stu JOIN score s ON  sstu.no = s.s_no 
  4.     -> WHERE s.score > 80; 
  5. +------+-------+-------+ 
  6. | no   | name  | score | 
  7. +------+-------+-------+ 
  8. | S001 | Sunny |    98 | 
  9. | S003 | Kevin |    88 | 
  10. +------+-------+-------+ 
  11. 2 rows in set (0.00 sec) 

上面按語義的邏輯是:

  • ***步:先進行student和score的內連接,如下:
    1. mysql> SELECT 
    2.     ->   stu.no, stu.name , s.score 
    3.     -> FROM student stu JOIN score s ON  sstu.no = s.s_no ; 
    4. +------+-------+-------+ 
    5. | no   | name  | score | 
    6. +------+-------+-------+ 
    7. | S001 | Sunny |    80 | 
    8. | S001 | Sunny |    98 | 
    9. | S001 | Sunny |    76 | 
    10. | S003 | Kevin |    78 | 
    11. | S003 | Kevin |    88 | 
    12. | S003 | Kevin |    68 | 
    13. +------+-------+-------+ 
    14. 6 rows in set (0.00 sec) 
  • 第二步:對內聯結果進行過濾, score > 80 得到,如下最終結果:
    1. -> WHERE s.score > 80; 
    2. +------+-------+-------+ 
    3. | no   | name  | score | 
    4. +------+-------+-------+ 
    5. | S001 | Sunny |    98 | 
    6. | S003 | Kevin |    88 | 
    7. +------+-------+-------+ 
    8. 2 rows in set (0.00 sec)  

上面的查詢過程符合語義,但是如果在filter條件能過濾很多數據的時候,先進行數據的過濾,在進行內聯接會獲取更好的性能,比如我們手工寫一下:

  1. mysql> SELECT 
  2.     ->   no, name , score 
  3.     -> FROM student stu JOIN ( SELECT s_no, score FROM score s WHERE s.score >80) as sc ON no = s_no
  4. +------+-------+-------+ 
  5. | no   | name  | score | 
  6. +------+-------+-------+ 
  7. | S001 | Sunny |    98 | 
  8. | S003 | Kevin |    88 | 
  9. +------+-------+-------+ 
  10. 2 rows in set (0.00 sec) 

上面寫法語義和***種寫法語義一致,得到相同的查詢結果,上面查詢過程是:

  • ***步:執行過濾子查詢
    1. mysql> SELECT s_no, score FROM score s WHERE s.score >80; 
    2. +------+-------+ 
    3. | s_no | score | 
    4. +------+-------+ 
    5. | S001 |    98 | 
    6. | S003 |    88 | 
    7. +------+-------+ 
    8. 2 rows in set (0.00 sec) 
  • 第二步:執行內連接
    1. -> ON no = s_no
    2. +------+-------+-------+ 
    3. | no   | name  | score | 
    4. +------+-------+-------+ 
    5. | S001 | Sunny |    98 | 
    6. | S003 | Kevin |    88 | 
    7. +------+-------+-------+ 
    8. 2 rows in set (0.00 sec) 

如上兩種寫法在語義上一致,但查詢性能在數量很大的情況下會有很大差距。上面為了和大家演示相同的查詢語義,可以有不同的查詢方式,不同的執行計劃。實際上數據庫本身的優化器會自動進行查詢優化,在內聯接中ON的聯接條件和WHERE的過濾條件具有相同的優先級,具體的執行順序可以由數據庫的優化器根據性能消耗決定。也就是說物理執行計劃可以先執行過濾條件進行查詢優化,如果細心的讀者可能發現,在第二個寫法中,子查詢我們不但有行的過濾,也進行了列的裁剪(去除了對查詢結果沒有用的c_no列),這兩個變化實際上對應了數據庫中兩個優化規則:

  • filter push down
  • project push down

如上優化規則以filter push down 為例,示意優化器對執行plan的優化變動:

3. LEFT OUTER JOIN

左外聯接語義是返回左表所有行,右表不存在補NULL,為了演示作用,我們查詢沒有參加考試的所有學生的成績單:

  1. mysql> SELECT 
  2.     ->   no, name , s.c_no, s.score 
  3.     -> FROM student stu LEFT JOIN score s ON sstu.no = s.s_no 
  4.     -> WHERE s.score is NULL; 
  5. +------+------+------+-------+ 
  6. | no   | name | c_no | score | 
  7. +------+------+------+-------+ 
  8. | S002 | Tom  | NULL |  NULL | 
  9. +------+------+------+-------+ 
  10. 1 row in set (0.00 sec) 

上面查詢的執行邏輯上也是分成兩步:

  • ***步:左外聯接查詢
    1. mysql> SELECT 
    2.     ->   no, name , s.c_no, s.score 
    3.     -> FROM student stu LEFT JOIN score s ON sstu.no = s.s_no; 
    4. +------+-------+------+-------+ 
    5. | no   | name  | c_no | score | 
    6. +------+-------+------+-------+ 
    7. | S001 | Sunny | C01  |    80 | 
    8. | S001 | Sunny | C02  |    98 | 
    9. | S001 | Sunny | C03  |    76 | 
    10. | S002 | Tom   | NULL |  NULL | -- 右表不存在的補NULL 
    11. | S003 | Kevin | C01  |    78 | 
    12. | S003 | Kevin | C02  |    88 | 
    13. | S003 | Kevin | C03  |    68 | 
    14. +------+-------+------+-------+ 
    15. 7 rows in set (0.00 sec) 
  • 第二步:過濾查詢
    1. mysql> SELECT  
    2.     ->   no, name , s.c_no, s.score 
    3.     -> FROM student stu LEFT JOIN score s ON sstu.no = s.s_no 
    4.     -> WHERE s.score is NULL; 
    5. +------+------+------+-------+ 
    6. | no   | name | c_no | score | 
    7. +------+------+------+-------+ 
    8. | S002 | Tom  | NULL |  NULL | 
    9. +------+------+------+-------+ 
    10. 1 row in set (0.00 sec) 

這兩個過程和上面分析的INNER JOIN一樣,但是這時候能否利用上面說的 filter push down的優化呢?根據LEFT OUTER JOIN的語義來講,答案是否定的。我們手工操作看一下:

  • ***步:先進行過濾查詢(獲得一個空表)
    1. mysql> SELECT * FROM score s WHERE s.score is NULL; 
    2. Empty set (0.00 sec) 
  • 第二步: 進行左外鏈接
    1. mysql> SELECT 
    2.     ->   no, name , s.c_no, s.score 
    3.     -> FROM student stu LEFT JOIN (SELECT * FROM score s WHERE s.score is NULL) AS s ON sstu.no = s.s_no; 
    4. +------+-------+------+-------+ 
    5. | no   | name  | c_no | score | 
    6. +------+-------+------+-------+ 
    7. | S001 | Sunny | NULL |  NULL | 
    8. | S002 | Tom   | NULL |  NULL | 
    9. | S003 | Kevin | NULL |  NULL | 
    10. +------+-------+------+-------+ 
    11. 3 rows in set (0.00 sec) 

我們發現兩種寫法的結果不一致,***種寫法只返回Tom沒有參加考試,是我們預期的。第二種寫法返回了Sunny,Tom和Kevin三名同學都沒有參加考試,這明顯是非預期的查詢結果。所有LEFT OUTER JOIN不能利用INNER JOIN的 filter push down優化。

4. RIGHT OUTER JOIN

右外鏈接語義是返回右表所有行,左邊不存在補NULL,如下:

  1. mysql> SELECT 
  2.     ->   s.c_no, s.score, no, name 
  3.     -> FROM score s RIGHT JOIN student stu ON sstu.no = s.s_no; 
  4. +------+-------+------+-------+ 
  5. | c_no | score | no   | name  | 
  6. +------+-------+------+-------+ 
  7. | C01  |    80 | S001 | Sunny | 
  8. | C02  |    98 | S001 | Sunny | 
  9. | C03  |    76 | S001 | Sunny | 
  10. | NULL |  NULL | S002 | Tom   | -- 左邊沒有的進行補 NULL 
  11. | C01  |    78 | S003 | Kevin | 
  12. | C02  |    88 | S003 | Kevin | 
  13. | C03  |    68 | S003 | Kevin | 
  14. +------+-------+------+-------+ 
  15. 7 rows in set (0.00 sec) 

上面右外鏈接我只是將上面左外鏈接查詢的左右表交換了一下:)。

5. FULL OUTER JOIN

全外鏈接語義返回左表和右表的并集,不存在一邊補NULL,用于演示的MySQL數據庫不支持FULL OUTER JOIN。這里不做演示了。

6. SELF JOIN

上面介紹的INNER JOIN、OUTER JOIN都是不同表之間的聯接查詢,自聯接是一張表以不同的別名做為左右兩個表,可以進行如上的INNER JOIN和OUTER JOIN。如下看一個INNER 自聯接:

  1. mysql> SELECT * FROM student l JOIN student r where l.no = r.no; 
  2. +------+-------+------+------+-------+------+ 
  3. | no   | name  | sex  | no   | name  | sex  | 
  4. +------+-------+------+------+-------+------+ 
  5. | S001 | Sunny | M    | S001 | Sunny | M    | 
  6. | S002 | Tom   | F    | S002 | Tom   | F    | 
  7. | S003 | Kevin | M    | S003 | Kevin | M    | 
  8. +------+-------+------+------+-------+------+ 
  9. 3 rows in set (0.00 sec)  

7. 不等值聯接

這里說的不等值聯接是SQL92語法里面的ON子句里面只有不等值聯接,比如:

  1. mysql> SELECT 
  2.     ->   s.c_no, s.score, no, name 
  3.     -> FROM score s RIGHT JOIN student stu ON stu.no != s.c_no; 
  4. +------+-------+------+-------+ 
  5. | c_no | score | no   | name  | 
  6. +------+-------+------+-------+ 
  7. | C01  |    80 | S001 | Sunny | 
  8. | C01  |    80 | S002 | Tom   | 
  9. | C01  |    80 | S003 | Kevin | 
  10. | C02  |    98 | S001 | Sunny | 
  11. | C02  |    98 | S002 | Tom   | 
  12. | C02  |    98 | S003 | Kevin | 
  13. | C03  |    76 | S001 | Sunny | 
  14. | C03  |    76 | S002 | Tom   | 
  15. | C03  |    76 | S003 | Kevin | 
  16. | C01  |    78 | S001 | Sunny | 
  17. | C01  |    78 | S002 | Tom   | 
  18. | C01  |    78 | S003 | Kevin | 
  19. | C02  |    88 | S001 | Sunny | 
  20. | C02  |    88 | S002 | Tom   | 
  21. | C02  |    88 | S003 | Kevin | 
  22. | C03  |    68 | S001 | Sunny | 
  23. | C03  |    68 | S002 | Tom   | 
  24. | C03  |    68 | S003 | Kevin | 
  25. +------+-------+------+-------+ 
  26. 18 rows in set (0.00 sec) 

上面這示例,其實沒有什么實際業務價值,在實際的使用場景中,不等值聯接往往是結合等值聯接,將不等值條件在WHERE子句指定,即, 帶有WHERE子句的等值聯接。

Apache Flink雙流JOIN

Apache Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF 可以轉換為普通的INNER和OUTER)。在語義上面Apache Flink嚴格遵守標準SQL的語義,與上面演示的語義一致。下面我重點介紹Apache Flink中JOIN的實現原理。

1. 雙流JOIN與傳統數據庫表JOIN的區別

傳統數據庫表的JOIN是兩張靜態表的數據聯接,在流上面是 動態表(關于流與動態表的關系請查閱 《Apache Flink 漫談系列 - 流表對偶(duality)性)》,雙流JOIN的數據不斷流入與傳統數據庫表的JOIN有如下3個核心區別:

  • 左右兩邊的數據集合無窮 - 傳統數據庫左右兩個表的數據集合是有限的,雙流JOIN的數據會源源不斷的流入;
  • JOIN的結果不斷產生/更新 - 傳統數據庫表JOIN是一次執行產生最終結果后退出,雙流JOIN會持續不斷的產生新的結果。在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇也有相關介紹。
  • 查詢計算的雙邊驅動 - 雙流JOIN由于左右兩邊的流的速度不一樣,會導致左邊數據到來的時候右邊數據還沒有到來,或者右邊數據到來的時候左邊數據沒有到來,所以在實現中要將左右兩邊的流數據進行保存,以保證JOIN的語義。在Blink中會以State的方式進行數據的存儲。State相關請查看《Apache Flink 漫談系列 - State》篇。

(1) 數據Shuffle

分布式流計算所有數據會進行Shuffle,怎么才能保障左右兩邊流的要JOIN的數據會在相同的節點進行處理呢?在雙流JOIN的場景,我們會利用JOIN中ON的聯接key進行partition,確保兩個流相同的聯接key會在同一個節點處理。

(2) 數據的保存

不論是INNER JOIN還是OUTER JOIN 都需要對左右兩邊的流的數據進行保存,JOIN算子會開辟左右兩個State進行數據存儲,左右兩邊的數據到來時候,進行如下操作:

  • LeftEvent到來存儲到LState,RightEvent到來的時候存儲到RState;
  • LeftEvent會去RightState進行JOIN,并發出所有JOIN之后的Event到下游;
  • RightEvent會去LeftState進行JOIN,并發出所有JOIN之后的Event到下游。

2. 簡單場景介紹實現原理

(1) INNER JOIN 實現

JOIN有很多復雜的場景,我們先以最簡單的場景進行實現原理的介紹,比如:最直接的兩個進行INNER JOIN,比如查詢產品庫存和訂單數量,庫存變化事件流和訂單事件流進行INNER JOIN,JION條件是產品ID,具體如下:

雙流JOIN兩邊事件都會存儲到State里面,如上,事件流按照標號先后流入到join節點,我們假設右邊流比較快,先流入了3個事件,3個事件會存儲到state中,但因為左邊還沒有數據,所有右邊前3個事件流入時候,沒有join結果流出,當左邊***個事件序號為4的流入時候,先存儲左邊state,再與右邊已經流入的3個事件進行join,join的結果如圖 三行結果會流入到下游節點sink。當第5號事件流入時候,也會和左邊第4號事件進行join,流出一條jion結果到下游節點。這里關于INNER JOIN的語義和大家強調兩點:

  • INNER JOIN只有符合JOIN條件時候才會有JOIN結果流出到下游,比如右邊***來的1,2,3個事件,流入時候沒有任何輸出,因為左邊還沒有可以JOIN的事件;
  • INNER JOIN兩邊的數據不論如何亂序,都能夠保證和傳統數據庫語義一致,因為我們保存了左右兩個流的所有事件到state中。

(2) LEFT OUTER JOIN 實現

LEFT OUTER JOIN 可以簡寫 LEFT JOIN,語義上和INNER JOIN的區別是不論右流是否有JOIN的事件,左流的事件都需要流入下游節點,但右流沒有可以JION的事件時候,右邊的事件補NULL。同樣我們以最簡單的場景說明LEFT JOIN的實現,比如查詢產品庫存和訂單數量,庫存變化事件流和訂單事件流進行LEFT JOIN,JION條件是產品ID,具體如下:

下圖也是表達LEFT JOIN的語義,只是展現方式不同:

上圖主要關注點是當左邊先流入1,2事件時候,右邊沒有可以join的事件時候會向下游發送左邊事件并補NULL向下游發出,當右邊***個相同的Join key到來的時候會將左邊先來的事件發出的帶有NULL的事件撤回(對應上面command的-記錄,+代表正向記錄,-代表撤回記錄)。這里強調三點:

  • 左流的事件當右邊沒有JOIN的事件時候,將右邊事件列補NULL后流向下游;* 當右邊事件流入發現左邊已經有可以JOIN的key的時候,并且是***個可以JOIN上的右邊事件(比如上面的3事件是***個可以和左邊JOIN key P001進行JOIN的事件)需要撤回左邊下發的NULL記錄,并下發JOIN完整(帶有右邊事件列)的事件到下游。后續來的4,5,6,8等待后續P001的事件是不會產生撤回記錄的。
  • 在Apache Flink系統內部事件類型分為正向事件標記為“+”和撤回事件標記為“-”。

3. RIGHT OUTER JOIN 和 FULL OUTER JOIN

RIGHT JOIN內部實現與LEFT JOIN類似, FULL JOIN和LEFT JOIN的區別是左右兩邊都會產生補NULL和撤回的操作。對于State的使用都是相似的,這里不再重復說明了。

4. 復雜場景介紹State結構

上面我們介紹了雙流JOIN會使用State記錄左右兩邊流的事件,同時我們示例數據的場景也是比較簡單,比如流上沒有更新事件(沒有撤回事件),同時流上沒有重復行事件。那么我們嘗試思考下面的事件流在雙流JOIN時候是怎么處理的?

上圖示例是連續產生了2筆銷售數量一樣的訂單,同時在產生一筆銷售數量為5的訂單之后,又將該訂單取消了(或者退貨了),這樣在事件流上面就會是上圖的示意,這種情況Blink內部如何支撐呢?

根據JOIN的語義以INNER JOIN為例,右邊有兩條相同的訂單流入,我們就應該向下游輸出兩條JOIN結果,當有撤回的事件流入時候,我們也需要將已經下發下游的JOIN事件撤回,如下:

上面的場景以及LEFT JOIN部分介紹的撤回情況,Apache Flink內部需要處理如下幾個核心點:

  • 記錄重復記錄(完整記錄重復記錄或者記錄相同記錄的個數)
  • 記錄正向記錄和撤回記錄(完整記錄正向和撤回記錄或者記錄個數)
  • 記錄哪一條事件是***個可以與左邊事件進行JOIN的事件

(1) 雙流JOIN的State數據結構

在Apache Flink內部對不同的場景有特殊的數據結構優化,本篇我們只針對上面說的情況(通用設計)介紹一下雙流JOIN的State的數據結構和用途:

數據結構

  1. Map<JoinKey, Map<rowData, count>>
  • ***級MAP的key是Join key,比如示例中的P001, value是流上面的所有完整事件;
  • 第二級MAP的key是行數據,比如示例中的P001, 2,value是相同事件值的個數

數據結構的利用

  • 記錄重復記錄 - 利用第二級MAP的value記錄重復記錄的個數,這樣大大減少存儲和讀取
  • 正向記錄和撤回記錄 - 利用第二級MAP的value記錄,當count=0時候刪除該元素
  • 判斷右邊是否產生撤回記錄 - 根據***級MAP的value的size來判斷是否產生撤回,只有size由0變成1的時候(***條和左可以JOIN的事件)才產生撤回

雙流JOIN的應用優化

1. 構造更新流

我們在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇中以雙流JOIN為例介紹了如何構造業務上的PK source,構造PK source本質上在保證業務語義的同時也是對雙流JOIN的一種優化,比如多級LEFT JOIN會讓流上的數據不斷膨脹,造成JOIN節點性能較慢,JOIN之后的下游節點邊堵(數據量大導致,非熱點)。那么嫌少流入JOIN的數據,比如構造PK source就會大大減少JOIN數據的膨脹。這里不再重復舉例,大家可以查閱 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》 的雙流JOIN示例部分。

2. NULL造成的熱點

比如我們有A LEFT JOIN B ON A.aCol = B.bCol LEFT JOIN C ON B.cCol = C.cCol 的業務,JOB的DAG如下:

假設在實際業務中有這樣的特點,大部分時候當A事件流入的時候,B還沒有可以JOIN的數據,但是B來的時候,A已經有可以JOIN的數據了,這特點就會導致,A LEFT JOIN B 會產生大量的 (A, NULL),其中包括B里面的 cCol 列也是NULL,這時候當與C進行LEFT JOIN的時候,首先Blink內部會利用cCol對AB的JOIN產生的事件流進行Shuffle, cCol是NULL進而是下游節點大量的NULL事件流入,造成熱點。那么這問題如何解決呢?

我們可以改變JOIN的先后順序,來保證A LEFT JOIN B 不會產生NULL的熱點問題,如下:

3. JOIN ReOrder

對于JOIN算子的實現我們知道左右兩邊的事件都會存儲到State中,在流入事件時候在從另一邊讀取所有事件進行JOIN計算,這樣的實現邏輯在數據量很大的場景會有一定的state操作瓶頸,我們某些場景可以通過業務角度調整JOIN的順序,來消除性能瓶頸,比如:A JOIN B ON A.acol = B.bcol JOIN C ON B.bcol = C.ccol. 這樣的場景,如果 A與B進行JOIN產生數據量很大,但是B與C進行JOIN產生的數據量很小,那么我們可以強制調整JOIN的聯接順序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 如下示意圖:

小結

本篇向大家介紹了數據庫設計范式的要求和實際業務的查詢需要是傳統數據庫JOIN算子存在的原因,并以具體示例的方式向大家介紹JOIN在數據庫的查詢過程,以及潛在的查詢優化,再以實際的例子介紹Apache Flink上面的雙流JOIN的實現原理和State數據結構設計,***向大家介紹兩個雙流JOIN的使用優化。

# 關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發工作。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-07-13 12:53:59

數據存儲

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-06-10 17:26:07

數據集計算

2018-12-29 08:16:32

Apache FlinJOIN代碼

2018-10-09 10:55:52

Apache FlinWatermark流計算

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-10-16 08:54:35

Apache Flin流計算State

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2019-01-03 10:17:53

Apache FlinTable API代碼

2022-07-13 13:03:29

流計算亂序

2022-07-12 10:38:25

分布式框架

2018-11-07 08:48:31

Apache Flin持續查詢流計算

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2020-04-09 11:08:30

PyFlinkJAR依賴

2022-06-20 05:52:27

FlinkTTL流查詢

2018-10-30 11:10:05

Flink數據集計算
點贊
收藏

51CTO技術棧公眾號

亚洲大奶少妇| 牛牛电影国产一区二区| 久久久精品性| 久久天天躁狠狠躁夜夜av| 亚洲热在线视频| 手机在线观看av| 欧美国产日产图区| 高清一区二区三区视频| 亚洲熟女综合色一区二区三区| 日韩一级毛片| 日韩成人在线网站| 亚洲精品在线网址| 成人mm视频在线观看| 亚洲午夜精品在线| 亚洲激情一区二区三区| 神宫寺奈绪一区二区三区| 麻豆精品久久精品色综合| 韩日欧美一区二区| www.99热| 琪琪久久久久日韩精品| 欧美精品 日韩| 成人性视频欧美一区二区三区| 精品国产白色丝袜高跟鞋| 337p粉嫩大胆噜噜噜噜噜91av| 91在线观看免费高清| 午夜久久久久久久久久影院| 激情视频一区二区三区| 久久久91精品| 大吊一区二区三区| 免费欧美视频| 精品香蕉一区二区三区| 国产精品久久久久久在线观看| 91视频亚洲| 欧美色涩在线第一页| 日本成年人网址| 精品极品在线| 午夜视频在线观看一区二区 | 精品综合久久久久| 97久久香蕉国产线看观看| 欧美日韩亚洲视频| 青青草原网站在线观看| 免费黄网站在线| 欧美激情在线一区二区三区| 日本不卡一二三区| 国产高清美女一级毛片久久| 久久久精品综合| 欧美12av| 国产在线超碰| 国产日本欧美一区二区| 日韩久久不卡| 国产对白叫床清晰在线播放| 欧美高清在线一区| 亚洲精品不卡| 成人直播在线| 亚洲永久免费视频| 国产精品入口芒果| 精精国产xxxx视频在线播放| 欧美日韩国产精品一区二区三区四区 | 欧美成人国产一区二区| 亚洲精品久久久久久| 中文字幕亚洲在线观看| 精品91自产拍在线观看一区| 中文字幕18页| 日本午夜精品久久久| 亚洲日本成人网| 妺妺窝人体色WWW精品| 蜜桃国内精品久久久久软件9| 亚洲欧美日韩一区二区在线| 少妇精品无码一区二区免费视频| 日韩精品免费一区二区三区| 久久九九精品99国产精品| 久久久久国产精品夜夜夜夜夜| 国产精品jizz在线观看美国| 久久免费成人精品视频| 国产伦精品一区二区三区视频网站| 天堂资源在线中文精品| 国产一区二区在线播放| 国产高清视频免费观看| 成+人+亚洲+综合天堂| 欧美日韩在线高清| 老司机99精品99| 午夜影院在线观看欧美| 久久久久久久久久久久91| 欧美电影院免费观看| 亚洲成人三级在线| 激情五月深爱五月| 亚洲天堂黄色| 国产精品aaa| 国产美女主播在线观看| 波多野结衣在线一区| 色姑娘综合av| 91美女主播在线视频| 欧美日韩在线精品一区二区三区激情 | 欧美日韩久久| 亚洲精品一区二区精华| 亚洲日本精品视频| 国产精品hd| 国产精品99久久久久久www| 精品人妻一区二区三区含羞草 | 欧美午夜大胆人体| 日本久久电影网| 亚洲欧美激情一区二区三区| 一本久久青青| 欧美黑人xxxⅹ高潮交| 成人小视频在线播放| 成人一级黄色片| 亚洲视频小说| 天堂电影一区| 欧美成人乱码一区二区三区| 天美传媒免费在线观看| 中文一区二区| 国产精品18毛片一区二区| 91亚洲欧美| 欧美色xxxx| 国产亚洲精品成人a| 99tv成人| 国产精品麻豆va在线播放| 天天操天天操天天干| 亚洲女同女同女同女同女同69| 成人在线免费播放视频| 精品国产第一国产综合精品| 在线电影av不卡网址| 国产又爽又黄的视频| 福利一区在线观看| 国产手机视频在线观看| 天堂久久一区| 在线性视频日韩欧美| 欧美性猛交xxxx乱大交hd | 777色狠狠一区二区三区| 无码一区二区三区在线| 香蕉久久a毛片| 精品伦精品一区二区三区视频| 91亚洲天堂| 欧美一区二区精品| 欧产日产国产v| 韩日欧美一区二区三区| 中文字幕一区二区三区有限公司 | 日本精品人妻无码77777| 免费看日韩精品| 日韩三级电影网站| 欧美性猛交xxx高清大费中文| 亚洲国产精品va| 日韩三级视频在线| 99国内精品久久| 国产成人无码a区在线观看视频| 成人av地址| 亚洲18私人小影院| 欧美性受xxxx狂喷水| 亚洲成在线观看| 北京富婆泄欲对白| 国产视频亚洲| 欧洲av一区| 国产精品亚洲成在人线| 最新的欧美黄色| 国产理论片在线观看| 亚洲欧美一区二区三区国产精品 | 午夜啪啪小视频| 久久久久久久久丰满| 亚洲在线观看视频网站| 第四色日韩影片| 亚洲精品午夜精品| 伊人影院中文字幕| 亚洲免费观看高清在线观看| 特种兵之深入敌后| 国产欧美另类| 亚洲欧美日本国产有色| 成人精品在线| 国内精品久久久久久| 肉丝一区二区| 欧美日韩三级视频| 久久久久久福利| www久久精品| 中文字幕永久有效| 好看的av在线不卡观看| 久久涩涩网站| 伊人久久大香| 韩剧1988免费观看全集| a天堂中文在线| 日韩欧美亚洲国产另类| 日韩精品一卡二卡| 国产精品网站在线播放| 免费看污污网站| 欧美在线国产| 欧美第一黄网| 亚洲网址在线观看| 日本精品久久电影| 宅男网站在线免费观看| 精品一区精品二区| a级片在线视频| 欧美午夜久久久| 一级性生活免费视频| av高清不卡在线| 成人综合久久网| 一道本一区二区| 日本一二三区视频在线| 九九热线有精品视频99| 99超碰麻豆| 91成人在线| 136fldh精品导航福利| 黄视频在线观看网站| 亚洲男人天堂视频| 亚洲va欧美va| 欧美高清激情brazzers| 国产精品一区无码| 亚洲电影第三页| 手机在线免费看毛片| 国产亚洲精品久| 国产一级伦理片| 国产精品中文字幕一区二区三区| 男女曰b免费视频| 亚洲片区在线| 91精品国产毛片武则天| 久久美女视频| 日本一区二区在线| 亚洲精品aaaaa| 国产99在线播放| 麻豆一区在线| 国产日韩欧美综合| 欧美日韩国产网站| 欧美一区深夜视频| 成年男女免费视频网站不卡| 欧美精品日韩www.p站| 素人av在线| 国产一区二区三区久久精品| 四虎电影院在线观看| 精品少妇一区二区| 亚洲av综合色区无码一二三区| 欧美精品777| 国产精品国产av| 欧美日韩国产在线观看| 在线观看国产黄| 欧美日韩一区二区三区四区| 成人免费一级片| 色94色欧美sute亚洲线路一ni| 黄色大片网站在线观看| 五月婷婷欧美视频| 日韩和一区二区| 午夜精品一区二区三区免费视频| 久久免费在线观看视频| 夜夜爽夜夜爽精品视频| 久久亚洲av午夜福利精品一区| 一区二区三区精品| 国产大片中文字幕在线观看| 亚洲国产中文字幕| 日本五十路女优| 精品久久香蕉国产线看观看亚洲| 日韩精品视频播放| 日韩欧美在线国产| 欧美成人精品网站| 欧美精品一级二级| 精品久久久久成人码免费动漫| 日韩欧美色电影| 免费a视频在线观看| 精品视频在线播放免| 国产有码在线| www.精品av.com| 丝袜国产在线| 91精品国产高清久久久久久| 三上悠亚一区二区| 国产欧美久久一区二区| 国产亚洲精aa在线看| 成人xxxxx色| 欧美顶级毛片在线播放| 欧美一区二区三区四区在线观看地址| 国产日产一区| 五月天综合婷婷| 精品白丝av| 国产精品人人妻人人爽人人牛| 日韩福利电影在线观看| 超碰在线免费av| 99久久精品免费精品国产| 中字幕一区二区三区乱码| 自拍偷拍亚洲欧美日韩| 中文字幕在线字幕中文| 欧美性色综合网| www.蜜臀av| 亚洲人成网在线播放| 欧美边添边摸边做边爱免费| 韩剧1988在线观看免费完整版 | 国产一区红桃视频| 澳门久久精品| 亚洲日本精品国产第一区| 欧美日一区二区三区在线观看国产免| 色综合av综合无码综合网站| 久久99精品国产麻豆不卡| 中国免费黄色片| 国产精品久久影院| 成人午夜视频精品一区| 91麻豆精品国产91久久久更新时间| 人妻妺妺窝人体色www聚色窝| 中文字幕亚洲在线| bbw在线视频| 亚洲在线观看视频网站| 国产探花一区二区| 青春草国产视频| 久久99精品久久久久婷婷| 欧美大片免费播放器| 日韩美女视频一区| 91丨九色丨海角社区| 精品成人私密视频| 天天在线视频色| 日本精品一区二区三区在线播放视频 | 色综合久久久久综合体 | 亚洲精品影视在线观看| a免费在线观看| 国产精品一二三视频| 欧亚精品一区| 青青视频免费在线| 久色婷婷小香蕉久久| 受虐m奴xxx在线观看| 亚洲成人精品一区二区| 国产aⅴ一区二区三区| 亚洲人成在线播放| 人成在线免费网站| 国产精品果冻传媒潘| 在线中文字幕第一区| 第四色婷婷基地| 国产三级欧美三级| 亚洲黄色激情视频| 亚洲国产精品久久久久久| 成人video亚洲精品| 成人午夜激情网| 色乱码一区二区三区网站| 日本中文字幕高清| 国产亚洲一区字幕| 国产伦精品一区二区三区视频我| 亚洲成色777777女色窝| 久久亚洲资源| 成人免费在线一区二区三区| 中文字幕亚洲精品乱码 | 久久伊人影院| 自拍偷拍99| 极品少妇一区二区三区精品视频 | 天天影视综合| 国产欧美一区二| 亚洲欧洲中文日韩久久av乱码| 91在线视频国产| 久久这里只有精品99| 国产一区一区| 喜爱夜蒲2在线| 国产成人精品午夜视频免费| 欧美成人aaa片一区国产精品| 日韩一区二区高清| 色婷婷在线播放| 国产精品swag| 国产精品久久国产愉拍| 三级网站在线免费观看| 91成人免费电影| 91在线导航| 91久色国产| 一区在线观看| 久久国产精品影院| 欧美主播一区二区三区美女| 色网站在线看| 亚洲最大的免费| 亚洲经典自拍| 国产精品扒开腿做爽爽| 欧美日韩免费一区二区三区| av网址在线看| 国产在线精品二区| 亚洲综合99| 北条麻妃在线观看视频| 日韩女优视频免费观看| 欧亚av在线| 亚洲色图自拍| 粉嫩蜜臀av国产精品网站| 欧美日韩乱国产| 色阁综合伊人av| 超碰97久久国产精品牛牛| 精品久久久久久无码国产| 中文字幕制服丝袜一区二区三区| 午夜精品无码一区二区三区| 97超级碰碰碰久久久| 视频在线不卡免费观看| 乱码一区二区三区| 色婷婷av一区二区| wwwav在线| 欧美精品七区| 极品美女销魂一区二区三区 | 亚洲欧洲视频| 欧洲性xxxx| 亚洲成人国产精品| 免费成人毛片| 91丨porny丨探花| 中文字幕一区二区三区av| 三级网站在线看| 成人av在线网址| 国产农村妇女毛片精品久久莱园子| 欧美日韩中文字幕视频| 亚洲电影免费观看| 91丨精品丨国产| 欧美精品无码一区二区三区| 一区二区三区高清| 高清av在线| 国产另类第一区| 国内一区二区在线| 亚洲国产av一区二区三区| 欧美激情国产高清| 99久久精品费精品国产| 久久久亚洲av波多野结衣|