精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

MapReduce連接:復制連接

數據庫
復制連接是map端的連接。復制連接得名于它的具體實現:連接中最小的數據集將會被復制到所有的map主機節點。復制連接有一個假設前提:在被連接的數據集中,有一個數據集足夠小到可以緩存在內存中。

如圖4.5所示,MapReduce復制連接工作原理如下:

  1. 使用分布式緩存(Districubted cache)將這個小數據集復制到所有運行map任務的節點。

  2. 用各個map任務初始化方法將這個小數據集裝載到一個哈希表(hashtable)中。

  3. 逐條用大數據集中的記錄遍歷這個哈希表,逐個判斷是否符合連接條件。

  4. 輸出符合連接條件的結果。

 

 

復制連接的實現非常直接明了。更具體的內容可以參考《Hadoop in Action》。附錄D提供了一個通用的框架來實現復制連接。這個框架支持任意類型的InputFormat和OutputFormat的數據。(我們將在下一個技術中使用這個框架。)復制連接框架根據內存足跡的大小從分布式緩存的內容和輸入塊(input split)兩者中動態地決定需要緩存的對象。

如果所有的輸入數據集都不能夠小到可以放到緩存中,那有沒有辦法來優化map端連接呢?那就到了看半連接(semi-join)的時間了。

附錄D.2 復制連接框架 

復制連接是map端連接,得名于它的具體實現:連接中最小的數據集將會被復制到所有的map主機節點。復制連接的實現非常直接明了。更具體的內容可以參考Chunk Lam的《Hadoop in Action》。 

這個部分的目標是:創建一個可以支持任意類型的數據集的通用的復制連接框架。這個框架中提供了一個優化的小功能:動態監測分布式緩存內容和輸入塊的大小,并判斷哪個更大。如果輸入塊較小,那么你就需要將map的輸入塊放到內存緩沖中,然后在map的cleanup方法中執行連接操作了。 

圖D.4是這個框架的類圖,這里提供了連接類(GenericReplicatedJoin)的具體實現,而不僅僅是一個抽象類。在這個框架外,這個類將和KeyValueTextInputFormat及TextOutputFormat協作。它的一個假設前提是:每個數據文件的***個標記是連接鍵。此外,連接類也可以被繼承擴展來支持任意類型的輸入和輸出。

圖D.5是連接框架的算法。Map的setup方法判斷在map的輸入塊和分布式緩存中的內容哪個大。如果分布式緩存的內容比較小,那么它將被裝載到內存緩存中。然后在Map函數開始連接操作。如果輸入塊比較小,map函數將輸入塊的鍵\值對裝載到內存緩存中。Map的cleanup方法將從分布式緩存中讀取記錄,逐條記錄和在內存緩存中的鍵\值對進行連接操作。

 

以下代碼是GenericReplicatedJoin類中setup方法。它在map的初始化階段被調用的。這個方法判斷分布式緩存中的文件和輸入塊哪個大。如果文件比較小,則將文件裝載到HashMap中。

  1. @Override 
  2. protected void setup(Context context) 
  3.     throws IOException, InterruptedException { 
  4.      
  5.     distributedCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
  6.     int distCacheSizes = 0; 
  7.      
  8.     for (Path distFile : distributedCacheFiles) { 
  9.         File distributedCacheFile = new File(distFile.toString()); 
  10.         distCacheSizes += distributedCacheFile.length(); 
  11.     } 
  12.      
  13.     if(context.getInputSplit() instanceof FileSplit) { 
  14.         FileSplit split = (FileSplit) context.getInputSplit(); 
  15.         long inputSplitSize = split.getLength(); 
  16.         distributedCacheIsSmaller = (distCacheSizes < inputSplitSize); 
  17.     } else { 
  18.         distributedCacheIsSmaller = true
  19.     } 
  20.      
  21.     if (distributedCacheIsSmaller) { 
  22.         for (Path distFile : distributedCacheFiles) { 
  23.             File distributedCacheFile = new File(distFile.toString()); 
  24.             DistributedCacheFileReader reader = getDistributedCacheReader(); 
  25.             reader.init(distributedCacheFile); 
  26.              
  27.             for (Pair p : (Iterable<Pair>) reader) { 
  28.                 addToCache(p); 
  29.             } 
  30.              
  31.             reader.close(); 
  32.         } 
  33.     } 

根據setup方法是否將分布式緩存的內容裝載到內存的緩存中,Map方法將會有不同的行為。如果分布式緩存中的內容被裝載到內存中,那么map方法就將輸入塊的記錄和內存中的緩存做連接操作。如果分布式緩存中的內容沒有被裝載到內存中,那么map方法就將輸入塊的記錄裝載到內存中,然后在cleanup方法中使用。

  1. @Override 
  2. protected void map(Object key, Object value, Context context) 
  3.     throws IOException, InterruptedException { 
  4.     Pair pair = readFromInputFormat(key, value); 
  5.      
  6.     if (distributedCacheIsSmaller) { 
  7.         joinAndCollect(pair, context); 
  8.     } else { 
  9.         addToCache(pair); 
  10.     } 
  11.  
  12. public void joinAndCollect(Pair p, Context context) 
  13.     throws IOException, InterruptedException { 
  14.     List<Pair> cached = cachedRecords.get(p.getKey()); 
  15.      
  16.     if (cached != null) { 
  17.         for (Pair cp : cached) { 
  18.             Pair result; 
  19.              
  20.             if (distributedCacheIsSmaller) { 
  21.                 result = join(p, cp); 
  22.             } else { 
  23.                 result = join(cp, p); 
  24.             } 
  25.              
  26.             if (result != null) { 
  27.                 context.write(result.getKey(), result.getData()); 
  28.             } 
  29.         } 
  30.     } 
  31.  
  32. public Pair join(Pair inputSplitPair, Pair distCachePair) { 
  33.     StringBuilder sb = new StringBuilder(); 
  34.      
  35.     if (inputSplitPair.getData() != null) { 
  36.         sb.append(inputSplitPair.getData()); 
  37.     } 
  38.      
  39.     sb.append("\t"); 
  40.      
  41.     if (distCachePair.getData() != null) { 
  42.         sb.append(distCachePair.getData()); 
  43.     } 
  44.      
  45.     return new Pair<Text, Text>( 
  46.                 new Text(inputSplitPair.getKey().toString()), 
  47.                 new Text(sb.toString())); 

當所有的記錄都被傳輸給map方法后,MapReduce將會調用cleanup方法。如果分布式緩存中的內容比輸入塊大,連接將會在cleanup中進行。連接的對象是map函數的緩存中的輸入塊的記錄和分布式緩存中的記錄。

  1. @Override 
  2. protected void cleanup(Context context) 
  3.     throws IOException, InterruptedException { 
  4.      
  5.     if (!distributedCacheIsSmaller) { 
  6.      
  7.         for (Path distFile : distributedCacheFiles) { 
  8.             File distributedCacheFile = new File(distFile.toString()); 
  9.             DistributedCacheFileReader reader = getDistributedCacheReader(); 
  10.             reader.init(distributedCacheFile); 
  11.              
  12.             for (Pair p : (Iterable<Pair>) reader) { 
  13.                 joinAndCollect(p, context); 
  14.             } 
  15.          
  16.             reader.close(); 
  17.         } 
  18.     } 

***,作業的驅動代碼必須指定需要裝載到分布式緩存中的文件。以下的代碼可以處理一個文件,也可以處理MapReduce輸入結果的一個目錄。

  1. Configuration conf = new Configuration(); 
  2.  
  3. FileSystem fs = smallFilePath.getFileSystem(conf); 
  4. FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath); 
  5.  
  6. if(smallFilePathStatus.isDir()) { 
  7.     for(FileStatus f: fs.listStatus(smallFilePath)) { 
  8.         if(f.getPath().getName().startsWith("part")) { 
  9.             DistributedCache.addCacheFile(f.getPath().toUri(), conf); 
  10.         } 
  11.     } 
  12. else { 
  13.     DistributedCache.addCacheFile(smallFilePath.toUri(), conf); 

這個框架假設分布式緩存中的內容和輸入塊的內容都可以被裝載到內存中。它的優點在于兩個數據集之中較小的才會裝載到內存中。

在論文《A Comparison of Join Algorithms for Log Processing in MapReduce》中,針對對于分布式緩存中的內容較大時的場景對這個方法進行了更多的優化。在他們的優化中,他們將分布式緩存分成N個分區,并將輸入塊放入N個哈希表。然后在cleanup方法中的優化就更加高效。

在map端的復制連接的問題在于,map任務必須在啟動時讀取分布式緩存。上述論文提到的另一個優化方案是重載FileInputFormat的splitting。將存在于同一個主機上的輸入塊合并成一個塊。然后就可以減少需要裝載分布式緩存的map任務的個數了。

***一個說明,Hadoop在org.apache.hadoop.mapred.join包中自帶了map端的連接。但是它需要有序的待連接的數據集的輸入文件,并要求將其分發到相同的分區中。這樣就造成了繁重的預處理工作。

原文鏈接:http://www.cnblogs.com/datacloud/p/3579333.html

責任編輯:彭凡 來源: 博客園
相關推薦

2014-03-18 10:23:11

MapReduce

2017-06-23 22:00:13

MySqlsslcentos

2015-08-21 13:50:49

Oracle連接

2009-07-22 10:53:42

MySQL左連接

2010-05-10 15:48:37

Unix連接

2021-03-24 09:06:01

MySQL長連接短連接

2011-03-28 14:04:10

SQL左連接右連接

2018-06-06 11:01:25

HTTP長連接短連接

2011-06-01 13:54:10

MySQL

2015-04-23 18:46:38

TCPTCP協議

2010-11-11 13:51:36

SQL Server內

2010-01-04 09:51:52

ADO連接對象

2010-11-08 15:47:01

SQL Server外

2014-01-02 14:04:39

PostgreSQLPerl

2014-01-02 15:41:24

PostgreSQLPHP

2014-01-02 13:22:01

PythonPostgreSQL

2019-09-16 09:29:01

TCP全連接隊列半連接隊列

2010-06-07 15:24:34

Java連接MYSQL

2023-01-31 18:09:12

物聯網移動物聯網

2010-08-24 09:29:37

內連接全連接
點贊
收藏

51CTO技術棧公眾號

国产精品素人一区二区| 欧美精品观看| 欧美视频精品在线观看| 青娱乐一区二区| 亚洲永久精品视频| 伊人情人综合网| 日韩av中文在线| 爱情岛论坛成人| 怡红院在线播放| www.日韩在线| 国产日韩中文在线| 成年人免费看毛片| 欧美丝袜一区| 精品88久久久久88久久久| 中文字幕第80页| 男女在线视频| 中文字幕一区二区5566日韩| 国产精品一区二区在线观看| 一级黄色在线观看| 欧美精品导航| www.日韩免费| 日本人妻一区二区三区| 国产精品久久亚洲不卡| 亚洲嫩草精品久久| 日韩福利视频| 天堂成人在线视频| 国产美女视频91| 国产成人精品一区二区三区| 国产va在线播放| 色综合天天爱| 亚洲人成人99网站| japanese在线观看| 日韩中文字幕在线一区| 在线视频亚洲一区| 欧美在线观看成人| 人妖欧美1区| 亚洲素人一区二区| 亚洲美女网站18| 麻豆国产在线播放| 91日韩一区二区三区| www.成人av.com| 国产免费久久久| 精品一区二区三区久久| 国产精品老女人精品视频| 天堂а√在线中文在线新版 | 精品一区二区三区四区| 性猛交╳xxx乱大交| 99国内精品久久久久| 欧美日韩一区二区三区高清| 黑人粗进入欧美aaaaa| 亚洲欧美电影| 一本到不卡免费一区二区| 免费拍拍拍网站| 538在线观看| 亚洲成a人v欧美综合天堂| 佐佐木明希av| 羞羞视频在线观看免费| 亚洲精品v日韩精品| 中文字幕超清在线免费观看| 免费黄色在线| 亚洲综合区在线| 亚洲精品天堂成人片av在线播放| 二区三区在线观看| 亚洲综合网站在线观看| www.国产二区| 国产精品一二三产区| 亚洲成人av一区二区| 日韩国产欧美亚洲| 亚洲欧美韩国| 欧美日韩精品综合在线| 日本久久精品一区二区| 999日本视频| 一本大道久久a久久综合婷婷| 国产成人短视频在线观看| 水莓100在线视频| 人妻av无码一区二区三区| 日产精品久久久一区二区| 一本久久综合亚洲鲁鲁| 国产精品美女久久久久久2018| 国产麻豆一区二区三区| 久久99国产精品久久99| 91情侣偷在线精品国产| 国产黄色片免费| 成人精品一区二区三区四区| 国产伦精品一区二区三毛| 日韩资源在线| 亚洲国产高清在线| 色一情一乱一乱一区91| rebdb初裸写真在线观看| 欧美丝袜一区二区三区| 一区二区三区韩国| 在线日韩成人| 亚洲天堂久久av| 日韩影院一区二区| 国产亚洲永久域名| 成人啪啪免费看| 日韩一二三四| 亚洲免费伊人电影| 男人操女人免费| 国产乱码精品一区二区三区亚洲人| 亚洲成人av片| 香蕉久久久久久久| 亚洲国产专区校园欧美| 国产精品久久久久一区二区| 国产xxxx在线观看| 久久久无码精品亚洲日韩按摩| 一区二区三区免费看| 丁香花视频在线观看| 91久久人澡人人添人人爽欧美| 亚洲一区二区三区四区精品| 国产精品一区二区中文字幕 | 国产精品三级网站| 午夜av免费在线观看| 亚洲日本在线看| 宅男噜噜噜66国产免费观看| 成人台湾亚洲精品一区二区| 色阁综合伊人av| 不卡av电影在线| 99精品久久99久久久久| 久久久久久av无码免费网站下载| sis001欧美| 亚洲精品二三区| 国产在线视频99| 国产一区二区三区在线看麻豆 | 99久久99久久精品国产片桃花| 蜜臀av性久久久久蜜臀aⅴ四虎| 精品国产一区二区三区性色av| 精品国产伦一区二区三区| 欧美黑人猛猛猛| 免费无码一区二区三区| 中文字幕在线观看第三页| 国产日本欧美在线| 99在线观看视频网站| 久久影院资源网| 午夜久久久影院| 日韩电影大全在线观看| 91极品在线| 香蕉影视欧美成人| 国产xxx在线观看| 欧美oldwomenvideos| 午夜伦理精品一区| 亚洲第一黄色片| 亚洲女人小视频在线观看| 日本男女交配视频| 亚洲精品黑牛一区二区三区| 色偷偷噜噜噜亚洲男人| 黄色大片网站在线观看| 成人免费av在线| 成人小视频在线观看免费| 成人在线中文| 亚洲一区二区久久久| 天堂在线免费观看视频| 不卡电影一区二区三区| 日本国产中文字幕| 精品国产鲁一鲁****| 欧美大码xxxx| 亚洲国产日韩在线观看| 亚洲欧美视频在线观看| 亚洲一二三av| 男女av免费观看| 日韩欧美视频一区二区| 视频一区二区三区在线看免费看| 亚洲欧美日本在线| 国产精品久久久久久久99| 欧美国产美女| 成人网在线免费看| 毛片在线看网站| 日韩欧美国产精品综合嫩v| 国产精品水嫩水嫩| 欧美黑人性猛交| 日韩亚洲第一页| 亚洲国产欧美日韩| 亚洲第一会所| 尤物yw午夜国产精品视频| 欧美亚洲日本一区二区三区| 国产精品探花在线播放| 日韩高清在线观看一区二区| 在线观看亚洲视频| 一卡二卡三卡在线| 精品亚洲免费视频| 欧美激情2020午夜免费观看| www.国产黄色| 午夜精品aaa| 偷拍夫妻性生活| 久久www免费人成看片高清| 午夜探花在线观看| 日韩一区免费| 2019av中文字幕| av在线免费一区| 欧美一区二区三区四区五区| 精品在线视频观看| 国产精品丝袜一区| 亚洲av午夜精品一区二区三区| 精品69视频一区二区三区Q| 久久久久免费网| 国产激情欧美| 2018日韩中文字幕| 免费观看久久久久| 精品成人佐山爱一区二区| 秋霞精品一区二区三区| 国产精品素人视频| 亚洲中文字幕一区| 免费在线一区观看| 欧美图片激情小说| 精品理论电影| 成人福利网站在线观看11| 国产一二三在线| 这里只有精品久久| 人妻少妇精品无码专区| 欧美在线制服丝袜| 欧美人与禽zozzo禽性配| 青青青青在线| 久久99精品久久久久| 国产系列第一页| 欧美美女啪啪| 91黄在线观看| 成人在线视频免费| 97超碰色婷婷| 国产理论在线观看| 一本一本久久a久久精品牛牛影视| 99精品免费观看| 欧美视频在线观看免费网址| 欧美成人777| 久久久久国产精品麻豆| 91蝌蚪视频在线| 老色鬼久久亚洲一区二区| 日本a级片在线播放| 欧美亚洲在线日韩| 久精品国产欧美| 成人三级毛片| 91免费版黄色| 综合久久av| 国产成人亚洲精品| 亚洲性图自拍| 久久成人一区二区| 137大胆人体在线观看| 日韩激情av在线免费观看| 国产视频一二三四区| 欧美性69xxxx肥| 亚洲婷婷综合网| 亚洲大型综合色站| 69av视频在线| 国产精品福利电影一区二区三区四区| 日本污视频网站| 国产亚洲1区2区3区| 中文字幕第3页| 成人国产精品免费观看| 四川一级毛毛片| 国产不卡高清在线观看视频| 涩多多在线观看| 国产在线一区观看| 91小视频在线播放| 国产盗摄一区二区三区| 99精品999| 老司机午夜精品| 亚洲黄色小视频在线观看| 最新国产乱人伦偷精品免费网站| 国产无限制自拍| 亚洲黄色在线| 亚洲美免无码中文字幕在线| 亚洲视频一区| 日韩精品一区在线视频| 黄色精品网站| 国产青青在线视频| 一区二区国产在线观看| 狠狠噜天天噜日日噜| 亚洲黄色高清| 看av免费毛片手机播放| 亚洲自拍另类| 91激情视频在线| 国产精品自拍三区| 丰满人妻一区二区三区免费视频棣 | 欧美日韩精品欧美日韩精品| 日韩xxx视频| 欧美日韩二区三区| 国产av无码专区亚洲av麻豆| 精品免费视频一区二区| 无码h黄肉3d动漫在线观看| 亚洲精品久久久久久下一站 | 久久99国产精品麻豆| 中文字幕一区二区在线观看视频| 粉嫩av一区二区三区在线播放 | 99久久婷婷国产综合精品电影| 午夜久久久久久久| 久久久久久9999| 国产67194| 亚洲成人av一区二区三区| 色av性av丰满av| 91精品在线免费| 亚洲美女综合网| 日韩精品在线视频| 五月婷婷在线观看| 欧美国产视频一区二区| 成人免费网站视频| 国产精品成人一区| 免费一级欧美在线大片| 激情视频在线观看一区二区三区| 亚洲裸色大胆大尺寸艺术写真| 国产日本一区二区三区| 青青草国产免费一区二区下载| 永久免费在线看片视频| aa国产精品| 精品亚洲视频在线| 国产日韩高清在线| 黄色一级视频免费观看| 欧美色另类天堂2015| 欧美日韩 一区二区三区| 亚洲福利精品在线| 中文字幕在线播放| 91av视频在线播放| 婷婷激情成人| 日韩美女一区| 在线日韩中文| 男人的天堂最新网址| 国产三级精品三级在线专区| 精品国产欧美日韩不卡在线观看 | 欧美成人高潮一二区在线看| 青青青伊人色综合久久| theporn国产精品| 国产精品麻豆欧美日韩ww| 日韩精品视频播放| 欧美精品乱人伦久久久久久| 亚洲国产精品国自产拍久久| www亚洲精品| 丝袜美腿诱惑一区二区三区| 91精品国产高清久久久久久91裸体| 婷婷综合成人| 日韩中文字幕在线视频观看| 国产一区二区三区在线观看精品| 中文字幕免费看| 亚洲综合男人的天堂| 亚洲成a人片77777精品| 中文字幕av一区二区| 国产中文在线播放| 亚洲一区亚洲二区亚洲三区| 狠狠做六月爱婷婷综合aⅴ| 久久综合久久网| 精久久久久久久久久久| 久久爱一区二区| 欧美网站大全在线观看| 四虎影视在线播放| 九九精品视频在线| 日本免费精品| 男人的天堂成人| 久久电影网电视剧免费观看| 午夜激情福利电影| 欧美日韩欧美一区二区| 男女网站在线观看| 欧美一区深夜视频| 欧洲专线二区三区| 日韩一级免费在线观看| 久久综合久久综合九色| 日韩黄色一级大片| 亚洲国产成人爱av在线播放| 宅男网站在线免费观看| 91在线网站视频| 1024日韩| 北岛玲一区二区| 黑人巨大精品欧美一区二区免费| 日本私人网站在线观看| 欧洲成人在线观看| 九九热爱视频精品视频| 免费黄色特级片| 国产精品不卡一区| 亚洲综合精品视频| 美女999久久久精品视频| 成人在线啊v| 男人天堂成人网| 国产91精品精华液一区二区三区| 青青操国产视频| 亚洲欧美国产va在线影院| 欧美片第一页| 亚洲精品在线免费看| 国产乱人伦偷精品视频免下载| 久久久久久久福利| 亚洲国产精品yw在线观看| 欧美另类tv| 日本在线播放不卡| 精品一区二区三区免费| 欧美精品久久久久性色| 亚洲欧洲一区二区三区在线观看| 天天综合网天天| 一区精品视频| 99久久免费国产| 最好看的日本字幕mv视频大全| 久久九九全国免费精品观看| www.久久热| 国产精品又粗又长| 国产女同互慰高潮91漫画| 夜夜躁很很躁日日躁麻豆| 国产69精品久久久久99| 久久不见久久见国语| 天天看片天天操| 色综合欧美在线| 久操视频在线免费播放| 国产精品久久亚洲7777| 久久精品国内一区二区三区| 免费中文字幕视频| 亚洲午夜久久久久久久| 亚洲国产欧美国产第一区|