Nacos源碼—8.Nacos升級(jí)gRPC分析三
大綱
7.服務(wù)端對(duì)服務(wù)實(shí)例進(jìn)行健康檢查
8.服務(wù)下線如何注銷注冊(cè)表和客戶端等信息
9.事件驅(qū)動(dòng)架構(gòu)源碼分析
7.服務(wù)端對(duì)服務(wù)實(shí)例進(jìn)行健康檢查
(1)服務(wù)端對(duì)服務(wù)實(shí)例進(jìn)行健康檢查的設(shè)計(jì)邏輯
(2)服務(wù)端對(duì)服務(wù)實(shí)例進(jìn)行健康檢查的源碼
(3)服務(wù)端檢查服務(wù)實(shí)例不健康后的注銷處理
(1)服務(wù)端對(duì)服務(wù)實(shí)例進(jìn)行健康檢查的設(shè)計(jì)邏輯
一.首先會(huì)獲取所有客戶端的Connection連接對(duì)象
Connection連接對(duì)象里有個(gè)屬性叫l(wèi)astActiveTime,表示的是最后存活時(shí)間。
二.然后判斷當(dāng)前時(shí)間-最后存活時(shí)間是否大于20s
如果大于,則把該Connection連接對(duì)象的connectionId放入到一個(gè)集合里。這個(gè)集合是一個(gè)名為outDatedConnections的待移除集合Set,此時(shí)該Connection連接對(duì)象并不會(huì)馬上刪除。
三.當(dāng)判斷完全部的Connection連接對(duì)象后會(huì)遍歷outDatedConnections集合
向遍歷到的Connection連接對(duì)象發(fā)起一次請(qǐng)求,確認(rèn)是否真的下線。如果響應(yīng)成功,則往successConnections集合中添加connectionId,并且刷新Connection連接對(duì)象的lastActiveTime屬性。這個(gè)機(jī)制有一個(gè)專業(yè)的名稱叫做:探活機(jī)制。
四.遍歷待移除集合進(jìn)行注銷并且在注銷之前先判斷一下是否探活成功
也就是connectionId存在于待移除集合outDatedConnections中,但是不存在于探活成功集合successConnections中,那么這個(gè)connectionId對(duì)應(yīng)的客戶端就會(huì)被注銷掉。
(2)服務(wù)端對(duì)服務(wù)實(shí)例進(jìn)行健康檢查的源碼
對(duì)服務(wù)實(shí)例進(jìn)行健康檢查的源碼入口是ConnectionManager的start()方法。
@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
Map<String, Connection> connections = new ConcurrentHashMap<>();
…
//Start Task:Expel the connection which active Time expire.
@PostConstruct
public void start() {
//Start UnHealthy Connection Expel Task.
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
...
//一.首先獲取所有的連接
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
...
//二.然后判斷客戶端是否超過(guò)20s沒(méi)有發(fā)來(lái)心跳信息了,如果是則會(huì)將clientId加入outDatedConnections集合中
Set<String> outDatedConnections = new HashSet<>();
long now = System.currentTimeMillis();
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String clientIp = client.getMetaInfo().getClientIp();
AtomicInteger integer = expelForIp.get(clientIp);
if (integer != null && integer.intValue() > 0) {
integer.decrementAndGet();
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {//判斷心跳時(shí)間
//添加到待移除列表
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
...
//client active detection.
//三.初次檢測(cè)完超過(guò)20s的Connection連接對(duì)象后,并不會(huì)立馬進(jìn)行刪除,而是進(jìn)行探活,服務(wù)端主動(dòng)請(qǐng)求客戶端,來(lái)確認(rèn)是否真的下線
Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
//遍歷超過(guò)20s沒(méi)有心跳的客戶端clientId
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
//調(diào)用GrpcConnection.asyncRequest()方法異步發(fā)送請(qǐng)求
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 1000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
//響應(yīng)成功刷新心跳時(shí)間
connection.freshActiveTime();
//并且加入到探活成功的集合列表中
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
Loggers.REMOTE_DIGEST.info("[{}]send connection active request ", outDateConnectionId);
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
Loggers.REMOTE_DIGEST.error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e);
latch.countDown();
}
}
latch.await(3000L, TimeUnit.MILLISECONDS);
Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size());
//經(jīng)過(guò)探活還是不成功的Connection連接對(duì)象,就準(zhǔn)備進(jìn)行移除了
//遍歷20s沒(méi)有心跳的客戶端,準(zhǔn)備移除客戶端信息
for (String outDateConnectionId : outDatedConnections) {
//判斷探活是否成功,如果成功了則不需要移除
if (!successConnections.contains(outDateConnectionId)) {
Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId);
//執(zhí)行客戶端注銷邏輯
unregister(outDateConnectionId);
}
}
}
...
}
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
...
}
(3)服務(wù)端檢查服務(wù)實(shí)例不健康后的注銷處理
進(jìn)行注銷處理的方法是ConnectionManager的unregister()方法。該方法主要會(huì)移除Connection連接對(duì)象 + 清除一些數(shù)據(jù),以及發(fā)布一個(gè)ClientDisconnectEvent客戶端注銷事件。

















