Hadoop 2.0 Yarn代碼:心跳驅(qū)動(dòng)服務(wù)分析
主要涉及的java文件
hadoop-yarn-server-resourcemanager下的包
org.apache.hadoop.yarn.server.resourcemanager
ResourceTrackerService.java
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
FifoScheduler.java
org.apache.hadoop.yarn.server.resourcemanager.rmnode
RMNodeImpl.java
hadoop-yarn-server-nodemanager下的包
org.apache.hadoop.yarn.server.nodemanager
NodeStatusUpdaterImpl.java
2.代碼分析
各個(gè)服務(wù)代碼已經(jīng)啟動(dòng),NodeStatusUpdate啟動(dòng)后開始驅(qū)動(dòng)整個(gè)Hadoop運(yùn)行
1).NodeStatusUpdaterImpl(NodeManager端):
NodeStatusUpdaterImpl一經(jīng)被啟動(dòng),start()函數(shù)被調(diào)用,進(jìn)行Hadoop RPC服務(wù)端的初始化操作(調(diào)用getServer函數(shù)創(chuàng)建服務(wù)等等)。
start()函數(shù)主要依次調(diào)用registerWithRM()函數(shù)和startStatusUpdater()函數(shù)
registerWithRM()函數(shù)
設(shè)置必要配置信息,和安全認(rèn)證操作
利用Hadoop RPC遠(yuǎn)程調(diào)用RM端ResourcesTrackerService下的registerNodeManager()方法,詳細(xì)見后面ResourcesTrackerService下的registerNodeManager()代碼分析
startStatusUpdater()函數(shù)
創(chuàng)建一個(gè)線程,然后啟動(dòng),所有操作都在運(yùn)行while的循環(huán)中
設(shè)置、獲取和輸出必要配置信息,其中比較重要的調(diào)用getNodeStatus()方法,獲取本地Container和本地Node的狀態(tài),以供后面的nodeHeartbeat()方法使用
通過Hadoop RPC遠(yuǎn)程調(diào)用RM端ResourcesTrackerService下的nodeHeartbeat()函數(shù),用while循環(huán)以一定時(shí)間間隔向RM發(fā)送心跳信息,心跳操作見下面ResourcesTrackerService下nodeHeartbeat()函數(shù)
nodeHeartbeat()將返回給NM信息,根據(jù)返回的response,根據(jù)response返回的信息標(biāo)記不需要的Container和Application發(fā)送相應(yīng)的FINISH_CONTAINERS和FINISH_APPS給ContainerManager,進(jìn)行清理操作----詳細(xì)見后面的代碼分析
2).ResourceTrackerService(ResourcesManager端):
ResourceTrackerService開頭與NodeStatusUpdaterImpl相似,start()函數(shù)被調(diào)用,初始化Hadoop RPC服務(wù)端,等待遠(yuǎn)程來調(diào)用ResourceTrackerService中的函數(shù)
接上面的NodeStatusUpdaterImpl中對registerNodeManager()和nodeHeartbeat()的Hadoop RPC調(diào)用,詳細(xì)調(diào)用細(xì)節(jié)見下文
以下分成主要從兩個(gè)函數(shù)registerNodeManager()和nodeHeartbeat()開始分析,所以分成兩部分---
第一部分:
1).接前文ResourceTrackerService下的registerNodeManager()函數(shù)
首先獲取本地的NodeID,還有相應(yīng)的主機(jī)名、端口、請求資源信息。
進(jìn)行安全認(rèn)證等輔助操作,檢查NodeID所標(biāo)記的Node是否"有效".如果“無效”的話,立即返回
Node“有效”說明此Node可用,于是創(chuàng)建RMNode(new RMNodeImpl)來識(shí)別這個(gè)Node的狀態(tài)和監(jiān)測在這個(gè)Node上運(yùn)行的Container和Application
判斷其是否為新RMNode,如果是則向其發(fā)送RMNodeEventType.STARTED
如果不是新的RMNode,則發(fā)送RMNodeEventType.RECONNECTED到RMNode,重新連接Node,見附加代碼分析。
最后返回給調(diào)用方操作結(jié)果。
2).RMNodeImpl:當(dāng)接收RMNodeEventType.STARTED后(接1)),發(fā)生狀態(tài)轉(zhuǎn)移NodeState(NEW→RUNNING),Transition函數(shù)被調(diào)用
向調(diào)度器(FifoScheduler)發(fā)送NODE_ADDED。
判斷這個(gè)Node是否Inactive,如果在Inactive中則,則先將這個(gè)Node移除出Inactive,否則增加ActiveNodes的數(shù)目。
3).FifoScheduler:接受NODE_ADDED事件,調(diào)用addNode()函數(shù),向RM報(bào)告新添加的Node的狀態(tài)
addNode函數(shù)被調(diào)用,首先將接收到的RMNode的NodeID和其相應(yīng)新創(chuàng)建的SchedulerNode(包含對資源的各種操作)放在ConcurrentHashMap類型的node對象中。
再調(diào)用Resources下的addTo()函數(shù),累加Node的資源數(shù)量,來計(jì)算集群中擁有的資源數(shù)量
至此NM端的Node已經(jīng)添加到RM的管轄范圍下,NM成功注冊到RM
附加代碼分析
附加2).RMNodeImpl:當(dāng)RMNode接收RMNodeEventType.RECONNECTED(接1)),則保持當(dāng)前狀態(tài)不變(RUNNING或者UNHEALTHY),Transition函數(shù)被調(diào)用
首先向調(diào)度器(FifoScheduler)發(fā)送NODE_REMOVED事件,刪除當(dāng)前Node
然后重新連接操作,如果新連接的Node與上一次斷開的Node為同一個(gè),則直接向調(diào)度器發(fā)送NODE_ADDED事件,如果兩個(gè)Node不是同一個(gè),則更新關(guān)于Node的參數(shù),再將新的Node加入ConcurrentHashMap類型的node對象中(與前面FifoScheduler中的是同一個(gè))
最后向新的RMNode發(fā)送RMNodeEventType.STARTED
附加3).FifoScheduler:接到NODE_REMOVED事件,調(diào)用removeNode()函數(shù)
removeNode()函數(shù)中,先將此Node上的Container全部Kill掉,通過發(fā)送RMContainerEventType.KILL實(shí)現(xiàn),因?yàn)楝F(xiàn)在討論沒有Job運(yùn)行,所以沒有Container可以Kill
從nodes中移出此Node,最后計(jì)算集群資源,將相應(yīng)Node的資源數(shù)量從集群資源總量扣除,完畢
第二部分
1).接前文ResourceTrackerService下的nodeHeartbeat()函數(shù),各個(gè)NM已經(jīng)注冊到RM上,則各個(gè)NM開始調(diào)用這個(gè)函數(shù)向RM發(fā)送“心跳”保持聯(lián)系,另外這里討論的是未提交Job的情況下
獲取所需操作的參數(shù)變量,例如NodeStatus、NodeId等
驗(yàn)證發(fā)送這次“心跳的”NM是否已經(jīng)注冊到RM,若未注冊則返回給NM讓其進(jìn)行重新(reboot)注冊到RM中(實(shí)際上就是讓NodeStatusUpdater跳過此次循環(huán))。
驗(yàn)證這個(gè)NM是否“有效”(有效:主機(jī)隊(duì)列包含這個(gè)NM或者黑名單沒有這個(gè)NM),如“無效”,則發(fā)送RMNodeEventType.DECOMMISSION到NM相應(yīng)的RMNode中,并關(guān)閉(shutdown)這個(gè)NM---下接附加2)
驗(yàn)證這次“心跳”是否與上一個(gè)“心跳”重復(fù)或者是不是新的“心跳”,這個(gè)通過心跳ID實(shí)現(xiàn),如果重復(fù)則輸出心跳重復(fù)信息,并且直接返回,如果不是新的心跳,則向RMNode發(fā)送RMNodeEventType.REBOOTING,然后返回reboot,讓NM“重啟”(和上面一樣NodeStatusUpdater跳過當(dāng)此次循環(huán))---下接附加2)
設(shè)置新的“心跳”ID,獲取Container和Application的信息
向RMNode發(fā)送包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,然后返回相應(yīng)設(shè)置好的response給調(diào)用者。
2).RMNodeImpl:RMNode接收到包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,RMNodeImpl狀態(tài)遷移NodeState(RUNNING→UNHEALTHY或RUNNING→RUNNING),Transition函數(shù)被調(diào)用
首先從RMNodeStatusEvent獲得必要的變量,然后判斷相應(yīng)的Node的“健康”情況,如果出現(xiàn)問題,則向調(diào)度器NODE_REMOVED,則調(diào)度器將此NM從集群管理刪除(詳見第一部分 附加3)),然后發(fā)送NODE_UNUSABLE到NodeListManager,將其RMNode放到“unusable”的set集合當(dāng)中,此時(shí)RMNodeImpl的NodeState(RUNNING→UNHEALTHY)
如果“健康” 則順利運(yùn)行,獲取NM遠(yuǎn)程傳過來的關(guān)于Container的信息(是在NM端用Hadoop RPC調(diào)用nodeHeartbeat()時(shí)傳送過來的),
說明:
由于這個(gè)地方討論的時(shí)候,無Job提交過來,NM端無Container啟動(dòng),NM發(fā)送到RM的事件里面的裝有Container狀態(tài)的List為空,所以只傳送“心跳” 表明NM的活動(dòng)信息,并沒有實(shí)際處理,RM端也無Application處理,接受“心跳”只會(huì)向RMNode發(fā)送RMNodeCleanContainerEvent事件,清理可能互動(dòng)的Container(對應(yīng)的代碼見FifoScheduler下的containerLaunchedOnNode函數(shù))。若詳見處理情況的運(yùn)行狀態(tài),參見后面的文章:RM與NM代碼_心跳驅(qū)動(dòng)服務(wù)分析_2 Container的配置和分配(Job提交)一篇。此時(shí)RMNodeImpl的NodeState(RUNNING→RUNNING)
到這為止,RM-NM端的代碼已經(jīng)啟動(dòng)完成,RM和NM之間以一定的時(shí)間間隔用心跳交互信息,等待Job的提交
附加代碼分析
附加2)RMNodeImpl:當(dāng)RMNode接收RMNodeEventType.DECOMMISSION),發(fā)生狀態(tài)轉(zhuǎn)移NodeState(RUNNING→DECOMMISSIONED),Transition函數(shù)被調(diào)用,
將DECOMMISSIONED設(shè)置為finalState
當(dāng)接到RMNodeEventType.REBOOTING情況類似,最后將REBOOTING設(shè)置為finalState。
分析如下圖,其中白色線為第一部分,初始NM注冊到RM階段,黃色線為第二部分,NM發(fā)送“心跳”信息到RM階段

原文鏈接:http://www.cnblogs.com/biyeymyhjob/archive/2012/08/21/2648026.html
【編輯推薦】
- 小白學(xué)數(shù)據(jù)分析:怎么做流失分析
- 小白學(xué)數(shù)據(jù)分析之K-means理論篇
- 小白學(xué)數(shù)據(jù)分析之從購買記錄分析道具支付環(huán)節(jié)
- 小白學(xué)數(shù)據(jù)分析之付費(fèi)滲透率
- 小白學(xué)數(shù)據(jù)分析之Excel制作INFOGRAPHIC






















