RocketMQ負載均衡機制解析!
消費者在消費消息的時候,需要知道從Broker的哪一個消息隊列中去獲取消息。
所以,在消費者端必須要做負載均衡,即Broker端中多個消費隊列分配給同一個消費者組中的哪些消費者消費。
在RocketMQ中,在消費者端有一個:Rebalance負載均衡組件。
- 他負責相對均勻的給消費者分配需要拉取的隊列信息。
消費者負載均衡
指為消費組下的每個消費者分配訂閱主題下的消費隊列,分配了消費隊列消費者就可以知道去消費哪個消費隊列上面的消息。
- 這里針對集群模式,因為廣播模式,所有的消息隊列可以被消費組下的每個消費者消費不涉及負載均衡。
而集群模式一個消息隊列同一時間只能分配給組內的一個消費者進行消費。
RocketMQ5.0以前是按照隊列粒度進行負載均衡的,5.0以后提供了按消息粒度進行負載均衡。
隊列粒度負載均衡
隊列粒度負載均衡策略中,同一消費者組內的多個消費者將按照隊列粒度消費消息,每個隊列只能被其中一個消費者消費。
隊列粒度負載均衡是在每個消費者端進行的,并不是由某個節點統一進行負載均衡之后將分配結果通知到每個消費者。
消費者增加或者減少會影響消息隊列的分配,所以Broker需要感知消費者的上下線情況。
消費者在啟動時會向所有的Broker發送心跳包進行注冊,通知Broker消費者上線,下線的時候也會向Broker發送取消注冊的請求。
Broker會維護消費者信息的注冊信息,在消費者發生變更時會通知消費者進行負載均衡。
Rebalance觸發時機
消費者啟動時觸發:
消費者在啟動時會進行一次負載均衡,為自己分配消息隊列。
Broker發現消費組變更時觸發:
處于以下兩種情況之一時會被判斷為消費組發生了變化,需要進行負載均衡:
- 某個消費組內有新的消費者向Broker進行了注冊。
- 比如某個消費組原來有兩個消費者,現在新增了一個消費者,新增的消費者啟動時會向Broker發送注冊請求。
- 消費組訂閱的主題信息發生了變化。
- 比如消費組新增訂閱了某個主題或者取消某個主題的訂閱,會被判斷為主題訂閱信息發生了變化。
被判定為變化之后,會觸發變更事件,向該消費者下的所有消費者發送發送變更請求,通知組下每個消費者進行負載均衡。
Broker收到消費者下線時觸發:
如果有消費者向Broker發送UNREGISTER_CLIENT取消注冊請求,并且開啟了允許通知變更,會觸發變更事件。
變更事件同上,Broker會通知該消費者組下的所有消費者進行一次負載均衡。
消費者定時觸發:
消費者本身也會定時執行負載均衡,默認是20s執行一次。
圖片
消息粒度負載均衡
在RocketMQ5.0之后,增加了消息粒度負載均衡策略,默認且僅使用消息粒度負載均衡策略。
消息粒度負載均衡策略中,同一消費組內的多個消費者將按照消息粒度平均分攤主題中的所有消息。
- 即同一個隊列中的消息,可被平均分配給組內多個消費者共同消費。
消息粒度負載均衡策略保證同一個隊列的消息可以被組內多個消費者共同處理。
但是該策略使用的消息分配算法結果是隨機的,不能指定消息被哪一個特定的消費者處理。
- 當消費者獲取到某條消息后,服務端會對該消息加鎖,保證該消息對其他消費者不可見,直到消息消費成功或者超時。
所以多個消費者同時消費同一個消息隊列中的消息,服務端也可以保證消息不會被多個消費者重復消費。
消息粒度負載均衡策略適用于絕大多數在線處理的業務場景,對于流式處理、聚合計算等場景,更適合隊列粒度的負載均衡策略。
執行流程
負載均衡服務執行邏輯在doRebalance函數,里面會對每個消費者組執行負載均衡操作。
consumerTable這個map對象里存儲了消費者組對應的的消費者實例。
private ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
public void doRebalance() {
//每個消費者組都有負載均衡
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}由于每個消費者組可能會消費很多topic,每個topic都有自己的不同隊列,最終是按topic的維度進行負載均衡。
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//按topic維度執行負載均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}最終負載均衡邏輯處理的實現在:
- org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic。
其中分為廣播消息和集群消息模型兩種情況處理。
圖片
負載均衡核心功能的主流程,主要做了4件事情:
圖片
負載均衡策略原理
負載均衡策略頂層接口:
/**
* Strategy Algorithm for message allocating between consumers
*/
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
* 給消費者id分配消費隊列
*/
List<MessageQueue> allocate(
final String consumerGroup, //消費者組
final String currentCID, //當前消費者id
final List<MessageQueue> mqAll, //所有的隊列
final List<String> cidAll //所有的消費者
);
}他默認共有7種負載均衡策略實現。
圖片
最常用的兩種平均分配算法:
AllocateMessageQueueAveragely
是用總數除以消費者個數,余數按消費者順序分配給消費者。
AlocateMessageQueueAveragelyByCircle
輪流一個一個分配。
參考:https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance/























