海量數(shù)據(jù)的二度人脈挖掘算法(Hadoop 實(shí)現(xiàn))
最近做了一個(gè)項(xiàng)目,要求找出二度人脈的一些關(guān)系,就好似新浪微博的“你可能感興趣的人” 中,間接關(guān)注推薦;簡(jiǎn)單描述:即你關(guān)注的人中有N個(gè)人同時(shí)都關(guān)注了 XXX 。
在程序的實(shí)現(xiàn)上,其實(shí)我們要找的是:若 User1 follow了10個(gè)人 {User3,User4,User5,... ,User12}記為集合UF1,那么 UF1中的這些人,他們也有follow的集合,分別是記為: UF3(User3 follow的人),UF4,UF5,...,UF12;而在這些集合肯定會(huì)有交集,而由最多集合求交產(chǎn)生的交集,就是我們要找的:感興趣的人。
我在網(wǎng)上找了些,關(guān)于二度人脈算法的實(shí)現(xiàn),大部分無(wú)非是通過(guò)廣度搜索算法來(lái)查找,猶豫深度已經(jīng)明確了2以?xún)?nèi);這個(gè)算法其實(shí)很簡(jiǎn)單,***步找到你關(guān)注的人;第二步找到這些人關(guān)注的人,***找出第二步結(jié)果中出現(xiàn)頻率***的一個(gè)或多個(gè)人,即完成。
但如果有***別的用戶(hù),那在運(yùn)算時(shí),就肯定會(huì)把這些用戶(hù)的follow 關(guān)系放到內(nèi)存中,計(jì)算的時(shí)候依次查找;先說(shuō)明下我沒(méi)有明確的診斷對(duì)比,這樣做的效果一定沒(méi) 基于hadoop實(shí)現(xiàn)的好;只是自己,想用hadoop實(shí)現(xiàn)下,最近也在學(xué);若有不足的地方還請(qǐng)指點(diǎn)。
首先,我的初始數(shù)據(jù)是文件,每一行為一個(gè)follow 關(guān)系 ida+‘\t’+idb;表示 ida follow idb。其次,用了2個(gè)Map/Reduce任務(wù)。
Map/Reduce 1:找出 任意一個(gè)用戶(hù) 的 follow 集合與 被 follow 的集合。如圖所示:

代碼如下:
Map任務(wù): 輸出時(shí) key :間接者 A 的ID ,value:follow 的人的ID 或 被follow的人的ID
- public void map(Text key, IntWritable values, Context context) throws IOException,InterruptedException{
- int value = values.get();
- //切分出兩個(gè)用戶(hù)id
- String[] _key = Separator.CONNECTORS_Pattern.split(key.toString());
- if(_key.length ==2){
- //"f"前綴表示 follow;"b" 前綴表示 被follow
- context.write(new Text(_key[0]), new Text("f"+_key[1]));
- context.write(new Text(_key[1]), new Text("b"+_key[0]));
- }
- }
Reduce任務(wù): 輸出時(shí) key :間接者 A 的ID , value為 兩個(gè)String,***個(gè)而follow的所有人(用分割符分割),第二個(gè)為 被follow的人(同樣分割)
- protected void reduce(Text key, Iterable<TextPair> pairs, Context context)
- throws IOException,InterruptedException{
- StringBuilder first_follow = new StringBuilder();
- StringBuilder second_befollow = new StringBuilder();
- for(TextPair pair: pairs){
- String id = pair.getFirst().toString();
- String value = pair.getSecond().toString();
- if(id.startsWith("f")){
- first_follow.append(id.substring(1)).append(Separator.TABLE_String);
- } else if(id.startsWith("b")){
- second_befollow.append(id.substring(1)).append(Separator.TABLE_String);
- }
- }
- context.write(key, new TextPair(first_follow.toString(),second_befollow.toString()));
- }
其中Separator.TABLE_String為自定義的分隔符;TextPair為自定義的 Writable 類(lèi),讓一個(gè)key可以對(duì)應(yīng)兩個(gè)value,且這兩個(gè)value可區(qū)分。
Map/Reduce 2:在上一步關(guān)系中,若B follow A,而 A follow T ,則可以得出 T 為 B 的二度人脈,且 間接者為A ,于是找出 相同二度人脈的不同間接人。如圖所示:

代碼如下:
Map 任務(wù):輸出時(shí) key 為 由兩個(gè)String 記錄的ID表示的 二度人脈關(guān)系,value 為 這個(gè)二度關(guān)系產(chǎn)生的間接人的ID
- public void map(Text key, TextPair values, Context context) throws IOException,InterruptedException{
- Map<String, String> first_follow = new HashMap<String, String>();
- Map<String, String> second_befollow = new HashMap<String, String>();
- String _key = key.toString();
- String[] follow = values.getFirst().toString().split(Separator.TABLE_String);
- String[] second = values.getSecond().toString().split(Separator.TABLE_String);
- for(String sf : follow){
- first_follow.put(sf , _key );
- }
- for(String ss : second){
- second_befollow.put(ss , _key );
- }
- for(Entry<String, String> f : first_follow.entrySet()){
- for(Entry<String, String> b : second_befollow.entrySet()){
- context.write(new TextPair(f.getKey() ,b.getKey()), new Text(key));
- }
- }
- }
Reduce任務(wù):輸出時(shí) key 仍然為二度人脈關(guān)系, value 為所有間接人 的ID以逗號(hào)分割。
- protected void reduce(TextPair key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- StringBuilder resutl = new StringBuilder();
- for (Text text : values){
- resutl.append(text.toString()).append(",");
- }
- context.write(key, new Text(resutl.toString()));
- }
到這步,二度人脈關(guān)系基本已經(jīng)挖掘出來(lái),后續(xù)的處理就很簡(jiǎn)單了,當(dāng)然也基于二度人脈挖掘三度,四度:)





















