精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

簡單易用的消息隊列框架的設計與實現

開發 開發工具
消息隊列在互聯網領域里得到了廣泛的應用,它多應用在異步處理、模塊之間的解偶和高并發的消峰等場景,消息隊列中表現最好的當屬Apache開源項目Kafka,Kafka使用支持高并發的Scala語言開發,利用操作系統的緩存原理達到高性能,并且天生具有可分區,分布式的特點,而且有不同語言的客戶端,使用起來非常的方便。

[[189769]]

1 背景介紹

消息隊列在互聯網領域里得到了廣泛的應用,它多應用在異步處理、模塊之間的解偶和高并發的消峰等場景,消息隊列中表現最好的當屬Apache開源項目Kafka,Kafka使用支持高并發的Scala語言開發,利用操作系統的緩存原理達到高性能,并且天生具有可分區,分布式的特點,而且有不同語言的客戶端,使用起來非常的方便。

Kclient是Kafka生產者客戶端和消費者客戶端的一個簡單易用的框架,它具有高效集成、高性能、高穩定的高級特點。

在繼續閱讀kclient的功能特性、架構設計和使用方法之前,讀者需要對Kafka進行基本的學習和了解。如果讀者是Kafka的初學者,而且英文還不錯,也可以直接參考Kafka官方在線文檔:Kafka 0.8.2 Documentation,如果對英文不感性趣,可以在網上搜索Kakfa的中文資料進行學習,內容需要涵蓋Kafka的使用向導以及利用操作系統緩存的“空中接力”、持久化、分片機制、高可用等原理。

本文包含了背景介紹、功能特性、架構設計、使用指南、API簡介、后臺監控和管理、消息處理機模板項目、以及性能壓測相關章節。如果你想使用kclient快速的構建Kafka處理機服務,請參考消息處理機模板項目章節; 如果你想了解kclient的其他使用方式、功能特性、監控和管理等,請參考功能特性、使用指南、API簡介、后臺監控和管理等章節; 如果你想更深入的理解kclient的架構設計和性能指標,請參考架構設計和性能壓測章節。

2 功能特性

2.1 簡單易用

簡化了Kafka客戶端API的使用方法, 特別是對消費端開發,消費端開發者只需要實現MessageHandler接口或者相關子類,在實現中處理消息完成業務邏輯,并且在主線程中啟動封裝的消費端服務器即可。它提供了各種常用的MessageHandler,框架自動轉換消息到領域對象模型或者JSON對象等數據結構,讓開發者更專注于業務處理。如果使用服務源碼注解的方式聲明消息處理機的后臺,可以將一個通用的服務方法直接轉變成具有完善功能的處理Kafka消息隊列的處理機,使用起來極其簡單,代碼看起來一目了然,在框架級別通過多種線程池技術保證了處理機的高性能。

在使用方面,它提供了多種使用方式:

  1. 直接使用Java API;
  2. 與Spring環境無縫集成;
  3. 服務源碼注解,通過注解聲明方式啟動Kafka消息隊列的處理機。

除此之外,它基于注解提供了消息處理機的模板項目,可以根據模板項目通過配置快速開發Kafka的消息處理機。

2.2 高性能

為了在不同的業務場景下實現高性能, 它提供不同的線程模型:

適合輕量級服務的同步線程模型;

適合IO密集型服務的異步線程模型(細分為所有消費者流共享線程池和每個流獨享線程池)。

另外,異步模型中的線程池也支持確定數量線程的線程池和線程數量可伸縮的線程池。

2.3 高穩定性

框架級別處理了常見的異常,計入錯誤日志,可用于錯誤手工恢復或者洗數據,并實現了優雅關機和重啟等功能。

3 架構設計

3.1 線程模型

1. 同步線程模型

在這種線程模型中,客戶端為每一個消費者流使用一個線程,每個線程負責從Kafka隊列里消費消息,并且在同一個線程里進行業務處理。我們把這些線程稱為消費線程,把這些線程所在的線程池叫做消息消費線程池。這種模型之所以在消息消費線程池處理業務,是因為它多用于處理輕量級別的業務,例如:緩存查詢、本地計算等。

2. 異步線程模型

在這種線程模型中,客戶端為每一個消費者流使用一個線程,每個線程負責從Kafka隊列里消費消息,并且傳遞消費得到的消息到后端的異步線程池,在異步線程池中處理業務。我們仍然把前面負責消費消息的線程池稱為消息消費線程池,把后面的異步線程池稱為異步業務線程池。這種線程模型適合重量級的業務,例如:業務中有大量的IO操作、網絡IO操作、復雜計算、對外部系統的調用等。

后端的異步業務線程池又細分為所有消費者流共享線程池和每個流獨享線程池。

1)所有消費者流共享線程池

所有消費者流共享線程池對比每個流獨享線程池,創建更少的線程池對象,能節省些許的內存,但是,由于多個流共享同一個線程池,在數據量較大的時候,流之間的處理可能互相影響。例如,一個業務使用2個區和兩個流,他們一一對應,通過生產者指定定制化的散列函數替換默認的key-hash, 實現一個流(區)用來處理普通用戶,另外一個流(區)用來處理VIP用戶,如果兩個流共享一個線程池,當普通用戶的消息大量產生的時候,VIP用戶只有少量,并且排在了隊列的后頭,就會產生餓死的情況。這個場景是可以使用多個topic來解決,一個普通用戶的topic,一個VIP用戶的topic,但是這樣又要多維護一個topic,客戶端發送的時候需要顯式的進行判斷topic目標,也沒有多少好處。

2)每個流獨享線程池

每個流獨享線程池使用不同的異步業務線程池來處理不同的流里面的消息,互相隔離,互相獨立,不互相影響,對于不同的流(區)的優先級不同的情況,或者消息在不同流(區)不均衡的情況下表現會更好,當然,創建多個線程池會多使用些許內存,但是這并不是一個大問題。

另外,異步業務線程池支持確定數量線程的線程池和線程數量可伸縮的線程池。

  1. 核心業務硬件資源有保證,核心服務有專享的資源池,或者線上流量可預測,請使用固定數量的線程池。
  2. 非核心業務一般混布,資源互相調配,線上流量不固定等情況請使用線程數量可伸縮的線程池。

3.2 異常處理

對于消息處理過程中產生的業務異常,當前在業務處理的上層捕捉了Throwable, 在專用的錯誤恢復日志中記錄出錯的消息,后續可根據錯誤恢復日志人工處理錯誤消息,也可重做或者洗數據。TODO:考慮實現異常Listener體系結構, 對異常處理實現監聽者模式,異常處理器可插拔等,默認打印錯誤日志。

由于默認的異常處理中,捕捉異常,在專用的錯誤回復日志中記錄錯誤,并且繼續處理下一個消息。考慮到可能上線失敗,或者上游消息格式出錯,導致所有消息處理都出錯,堆滿錯誤恢復日志的情況,我們需要訴諸于報警和監控系統來解決。

3.3 優雅關機

由于消費者本身是一個事件驅動的服務器,類似Tomcat,Tomcat接收HTTP請求返回HTTP響應,Consumer則接收Kafka消息,然后處理業務后返回,也可以將處理結果發送到下一個消息隊列。所以消費者本身是非常復雜的,除了線程模型,異常處理,性能,穩定性,可用性等都是需要思考點。既然消費者是一個后臺的服務器,我們需要考慮如何優雅的關機,也就是在消費者服務器在處理消息的時候,如果關機才能不導致處理的消息中斷而丟失。

優雅關機的重點在于解決如下3個問題:

  1. 如何知道JVM要退出?
  2. 如何阻止Daemon的線程在JVM退出被殺掉而導致消息丟失?
  3. 如果Worker線程在阻塞,如何喚起并退出?

第一個問題:如果一個后臺程序運行在控制臺的前臺,通過Ctrl + C可以發送退出信號給JVM,也可以通過kill -2 PS_IS 或者 kill -15 PS_IS發送退出信號,但是不能發送kill -9 PS_IS, 否則進程會無條件強制退出。JVM收到退出信號后,會調用注冊的鉤子,我們通過的注冊的JVM退出鉤子進行優雅關機。

第二個問題:線程分為Daemon線程和非Daemon線程,一個線程默認繼承父線程的Daemon屬性,如果當前線程池是由Daemon線程創建的,則Worker線程是Daemon線程。如果Worker線程是Daemon線程,我們需要在JVM退出鉤子中等待Worker線程完成當前手頭處理的消息,再退出JVM。如果不是Daemon線程,即使JVM收到退出信號,也得等待Worker線程退出后再退出,不會丟掉正在處理的消息。

第三個問題:在Worker線程從Kafka服務器消費消息的時候,Worker線程可能處于阻塞,這時需要中斷線程以退出,沒有消息被丟掉。在Worker線程處理業務時有可能有阻塞,例如:IO,網絡IO,在指定退出時間內沒有完成,我們也需要中斷線程退出,這時會產生一個InterruptedException, 在異常處理的默認處理器中被捕捉,并寫入錯誤日志,Worker線程隨后退出。

4 使用指南

kclient提供了三種使用方法,對于每一種方法,按照下面的步驟可快速構建Kafka生產者和消費者程序。

4.1 前置步驟

1.下載源代碼后在項目根目錄執行如下命令安裝打包文件到你的Maven本地庫。

  1. mvn install 

2.在你的項目pom.xml文件中添加對kclient的依賴。

  1. <dependency> 
  2.     <groupId>com.robert.kafka</groupId> 
  3.     <artifactId>kclient-core</artifactId> 
  4.     <version>0.0.1</version> 
  5. </dependency> 

3.根據Kafka官方文檔搭建Kafka環境,并創建兩個Topic, test1和test2。

4.然后,從Kafka安裝目錄的config目錄下拷貝kafka-consumer.properties和kafka-producer.properties到你的項目類路徑下,通常是src/main/resources目錄。

4.2 Java API

Java API提供了最直接,最簡單的使用kclient的方法。

構建Producer示例:

  1. KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties""test"); 
  2.  
  3. for (int i = 0; i < 10; i++) { 
  4.     Dog dog = new Dog(); 
  5.     dog.setName("Yours " + i); 
  6.     dog.setId(i); 
  7.     kafkaProducer.sendBean2Topic("test", dog); 
  8.  
  9.     System.out.format("Sending dog: %d \n", i + 1); 
  10.  
  11.     Thread.sleep(100); 

構建Consumer示例:

  1. DogHandler mbe = new DogHandler(); 
  2.  
  3. KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties""test", 1, mbe); 
  4. try { 
  5.     kafkaConsumer.startup(); 
  6.  
  7.     try { 
  8.         System.in.read(); 
  9.     } catch (IOException e) { 
  10.         e.printStackTrace(); 
  11.     } 
  12. } finally { 
  13.     kafkaConsumer.shutdownGracefully(); 
  1. public class DogHandler extends BeanMessageHandler<Dog> { 
  2.     public DogHandler() { 
  3.         super(Dog.class); 
  4.     } 
  5.  
  6.     protected void doExecuteBean(Dog dog) { 
  7.         System.out.format("Receiving dog: %s\n", dog); 
  8.     } 

4.3 Spring環境集成

kclient可以與Spring環境無縫集成,你可以像使用Spring Bean一樣來使用KafkaProducer和KafkaConsumer。

構建Producer示例:

  1. ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml"); 
  2.  
  3. KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer"); 
  4.  
  5. for (int i = 0; i < 10; i++) { 
  6.     Dog dog = new Dog(); 
  7.     dog.setName("Yours " + i); 
  8.     dog.setId(i); 
  9.     kafkaProducer.send2Topic("test", JSON.toJSONString(dog)); 
  10.  
  11.     System.out.format("Sending dog: %d \n", i + 1); 
  12.  
  13.     Thread.sleep(100); 
  1. <bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init"
  2.     <property name="propertiesFile" value="kafka-producer.properties"/> 
  3.     <property name="defaultTopic" value="test"/> 
  4. </bean> 

構建Consumer示例:

  1. ApplicationContext ac = new ClassPathXmlApplicationContext( 
  2.         "kafka-consumer.xml"); 
  3.  
  4. KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer"); 
  5. try { 
  6.     kafkaConsumer.startup(); 
  7.  
  8.     try { 
  9.         System.in.read(); 
  10.     } catch (IOException e) { 
  11.         e.printStackTrace(); 
  12.     } 
  13. } finally { 
  14.     kafkaConsumer.shutdownGracefully(); 
  1. public class DogHandler extends BeanMessageHandler<Dog> { 
  2.     public DogHandler() { 
  3.         super(Dog.class); 
  4.     } 
  5.  
  6.     protected void doExecuteBean(Dog dog) { 
  7.         System.out.format("Receiving dog: %s\n", dog); 
  8.     } 
  1. <bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" /> 
  2.  
  3. <bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init"
  4.     <property name="propertiesFile" value="kafka-consumer.properties" /> 
  5.     <property name="topic" value="test" /> 
  6.     <property name="streamNum" value="1" /> 
  7.     <property name="handler" ref="dogHandler" /> 
  8. </bean> 

4.4 服務源碼注解

kclient提供了類似Spring聲明式的編程方法,使用注解聲明Kafka處理器方法,所有的線程模型、異常處理、服務啟動和關閉等都由后臺服務自動完成,極大程度的簡化了API的使用方法,提高了開發者的工作效率。

注解聲明Kafka消息處理器:

  1. @KafkaHandlers 
  2. public class AnnotatedDogHandler { 
  3.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) 
  4.     @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1"
  5.     public Cat dogHandler(Dog dog) { 
  6.         System.out.println("Annotated dogHandler handles: " + dog); 
  7.  
  8.         return new Cat(dog); 
  9.     } 
  10.  
  11.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) 
  12.     public void catHandler(Cat cat) throws IOException { 
  13.         System.out.println("Annotated catHandler handles: " + cat); 
  14.  
  15.         throw new IOException("Man made exception."); 
  16.     } 
  17.  
  18.     @ErrorHandler(exception = IOException.class, topic = "test1"
  19.     public void ioExceptionHandler(IOException e, String message) { 
  20.         System.out.println("Annotated excepHandler handles: " + e); 
  21.     } 

注解啟動程序:

  1. public static void main(String[] args) { 
  2.     ApplicationContext ac = new ClassPathXmlApplicationContext( 
  3.             "annotated-kafka-consumer.xml"); 
  4.  
  5.     try { 
  6.         System.in.read(); 
  7.     } catch (IOException e) { 
  8.         e.printStackTrace(); 
  9.     } 

注解Spring環境配置:

  1. <bean name="kclientBoot" class="com.robert.kafka.kclient.boot.kclientBoot" init-method="init"/> 
  2.  
  3. <context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" /> 

5 API簡介

5.1 Producer API

KafkaProducer類提供了豐富的API來發送不同類型的消息,它支持發送字符串消息,發送一個普通的Bean,以及發送JSON對象等。在這些API中可以指定發送到某個Topic,也可以不指定而使用默認的Topic。對于發送的數據,支持帶Key值的消息和不帶Key值的消息。

發送字符串消息:

  1. public void send(String message); 
  2. public void send2Topic(String topicName, String message);  
  3. public void send(String key, String message);  
  4. public void send2Topic(String topicName, String key, String message);  
  5. public void send(Collection<String> messages);  
  6. public void send2Topic(String topicName, Collection<String> messages);  
  7. public void send(Map<String, String> messages);  
  8. public void send2Topic(String topicName, Map<String, String> messages); 

發送Bean消息:

  1. public <T> void sendBean(T bean);  
  2. public <T> void sendBean2Topic(String topicName, T bean);  
  3. public <T> void sendBean(String key, T bean);  
  4. public <T> void sendBean2Topic(String topicName, String key, T bean);  
  5. public <T> void sendBeans(Collection<T> beans);  
  6. public <T> void sendBeans2Topic(String topicName, Collection<T> beans);  
  7. public <T> void sendBeans(Map<String, T> beans);  
  8. public <T> void sendBeans2Topic(String topicName, Map<String, T> beans); 

發送JSON對象消息:

  1. public void sendObject(JSONObject jsonObject);  
  2. public void sendObject2Topic(String topicName, JSONObject jsonObject);  
  3. public void sendObject(String key, JSONObject jsonObject);  
  4. public void sendObject2Topic(String topicName, String key, JSONObject jsonObject);  
  5. public void sendObjects(JSONArray jsonArray);  
  6. public void sendObjects2Topic(String topicName, JSONArray jsonArray);  
  7. public void sendObjects(Map<String, JSONObject> jsonObjects);  
  8. public void sendObjects2Topic(String topicName, Map<String, JSONObject> jsonObjects); 

5.2 Consumer API

KafkaConsumer類提供了豐富的構造函數用來指定Kafka消費者服務器的各項參數,包括線程池策略,線程池類型,流數量等等。

使用PROPERTIES文件初始化:

  1. public KafkaConsumer(String propertiesFile, String topic, int streamNum, MessageHandler handler); 
  2. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, MessageHandler handler); 
  3. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler); 
  4. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler); 
  5. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler); 

使用PROPERTIES對象初始化:

  1. public KafkaConsumer(Properties properties, String topic, int streamNum, MessageHandler handler); 
  2. public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, MessageHandler handler); 
  3. public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler); 
  4. public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler); 
  5. public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler); 

5.3 消息處理器

消息處理器結構提供了一個基本接口,并且提供了不同的抽象類實現不同層次的功能,讓功能得到最大化的重用,并且互相解偶,開發者可以根據需求選擇某一個抽象類來繼承和使用。

接口定義:

  1. public interface MessageHandler { 
  2.     public void execute(String message); 

安全處理異常抽象類:

  1. public abstract class SafelyMessageHandler implements MessageHandler { 
  2.     public void execute(String message) { 
  3.         try { 
  4.             doExecute(message); 
  5.         } catch (Throwable t) { 
  6.             handleException(t, message); 
  7.         } 
  8.     } 
  9.  
  10.     protected void handleException(Throwable t, String message) { 
  11.         for (ExceptionHandler excepHandler : excepHandlers) { 
  12.             if (t.getClass() == IllegalStateException.class 
  13.                     && t.getCause() != null 
  14.                     && t.getCause().getClass() == InvocationTargetException.class 
  15.                     && t.getCause().getCause() != null
  16.                 t = t.getCause().getCause(); 
  17.  
  18.             if (excepHandler.support(t)) { 
  19.                 try { 
  20.                     excepHandler.handle(t, message); 
  21.                 } catch (Exception e) { 
  22.                     log.error( 
  23.                             "Exception hanppens when the handler {} is handling the exception {} and the message {}. Please check if the exception handler is configured properly."
  24.                             excepHandler.getClass(), t.getClass(), message); 
  25.                     log.error( 
  26.                             "The stack of the new exception on exception is, "
  27.                             e); 
  28.                 } 
  29.             } 
  30.         } 
  31.     } 
  32.  
  33. protected abstract void doExecute(String message); 

面向類型的抽象類:

  1. public abstract class BeanMessageHandler<T> extends SafelyMessageHandler {...} 
  2. public abstract class BeansMessageHandler<T> extends SafelyMessageHandler {...} 
  3. public abstract class DocumentMessageHandler extends SafelyMessageHandler {...} 
  4. public abstract class ObjectMessageHandler extends SafelyMessageHandler {...} 
  5. public abstract class ObjectsMessageHandler extends SafelyMessageHandler {...} 

5.4 消息處理器注解

正如上面使用指南第三部分服務源碼注解所講述的那樣,kclient可以通過注解來聲明Kafka消息處理器,kclient提供了@KafkaHandlers、@InputConsumer、@OutputProducer和@ErrorHandler等注解。

@KafkaHandlers:

  1. @Target({ ElementType.TYPE }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. @Component 
  5. public @interface KafkaHandlers { 

@InputConsumer:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface InputConsumer { 
  5.     String propertiesFile() default ""
  6.  
  7.     String topic() default ""
  8.  
  9.     int streamNum() default 1; 
  10.  
  11.     int fixedThreadNum() default 0; 
  12.  
  13.     int minThreadNum() default 0; 
  14.  
  15.     int maxThreadNum() default 0; 

@OutputProducer:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface OutputProducer { 
  5.     String propertiesFile() default ""
  6.  
  7.     String defaultTopic() default ""

@ErrorHandler:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface ErrorHandler { 
  5.     Class<? extends Throwable> exception() default Throwable.class; 
  6.  
  7.     String topic() default ""

6 消息處理機模板項目

6.1 快速開發向導

通過下面步驟可以快速開發Kafka處理機服務。

1.從本項目下載kclient-processor項目模板,并且根據業務需要修改pom.xml后導入Eclipse。

2.根據業務需要更改com.robert.kclient.app.handler.AnimalsHandler類名稱,并且根據業務需要修改處理器的注解。這里,可以導入業務服務對消息進行處理。

  1. @KafkaHandlers 
  2. public class AnimalsHandler { 
  3.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) 
  4.     @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1"
  5.     public Cat dogHandler(Dog dog) { 
  6.         System.out.println("Annotated dogHandler handles: " + dog); 
  7.  
  8.         return new Cat(dog); 
  9.     } 
  10.  
  11.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) 
  12.     public void catHandler(Cat cat) throws IOException { 
  13.         System.out.println("Annotated catHandler handles: " + cat); 
  14.  
  15.         throw new IOException("Man made exception."); 
  16.     } 
  17.  
  18.     @ErrorHandler(exception = IOException.class, topic = "test1"
  19.     public void ioExceptionHandler(IOException e, String message) { 
  20.         System.out.println("Annotated excepHandler handles: " + e); 
  21.     } 

3.通過mvn package既可以打包包含Spring Boot功能的自啟動jar包。

4.通過java -jar kclient-processor.jar即可啟動服務。

6.2 后臺監控和管理

kclient模板項目提供了后臺管理接口來監控和管理消息處理服務。

1.歡迎詞 - 用來校驗服務是否啟動成功。

curl http://localhost:8080/

2.服務狀態 - 顯示處理器數量。

curl http://localhost:8080/status

3.重啟服務 - 重新啟動服務。

curl http://localhost:8080/restart

7 性能壓測

Benchmark應該覆蓋推送QPS、接收處理QPS以及單線程、多線程生產者的用例。

用例1: 輕量級服務同步線程模型和異步線程模型的性能對比。

用例2: 重量級服務同步線程模型和異步線程模型的性能對比。

用例3: 重量級服務異步線程模型中所有消費者流共享線程池和每個流獨享線程池的性能對比。

用例4: 重量級服務異步線程模型中每個流獨享線程池的對比的確定數量線程的線程池和線程數量可伸縮的線程池的性能對比。

由于筆者在發文的時候還沒有時間完成前面四種場景的壓測,暫時留給讀者親自動手,作為一個練習。

8 更多思考

盡管本文設計和實現的kclient項目提供了許多高級功能,并且使用起來方便,而且筆者在幾家公司里在線上進行了應用,已經發揮了不少的作用,但是,還有一些細節需要提高。

kclient處理器項目中管理Restful服務暫時只提供了獲得狀態的API,需要進行進一步的豐富,增加對線程池的監控,以及消息處理性能的監控。

當前注解@ErrorHandler里面的exception參數為必選,完全可以使用方法第一參數進行推導,簡化開發人員配置的工作。

模板項目還不完善,需要增加啟動和關閉腳本,這樣讀者可以直接拷貝使用。

盡管線上應用已經證明了kclient沒有性能問題,但是開發一款中間件系統是需要閉環的,需要盡快把性能壓測這塊昨晚并且形成壓測報表。

點擊《簡單易用的消息隊列框架的設計與實現》閱讀原文。

【本文為51CTO專欄作者“李艷鵬”的原創稿件,轉載可通過作者簡書號(李艷鵬)或51CTO專欄獲取聯系】

戳這里,看該作者更多好文

責任編輯:武曉燕 來源: 51CTO專欄
相關推薦

2023-09-12 14:58:00

Redis

2023-11-13 08:37:33

消息中間件分布式架構

2024-05-07 09:02:47

2023-12-30 13:47:48

Redis消息隊列機制

2015-11-10 18:04:22

FileMaker

2022-01-15 07:20:18

Redis List 消息隊列

2010-02-26 13:14:39

Java日志系統

2024-02-26 07:43:10

大語言模型LLM推理框架

2022-01-21 19:22:45

RedisList命令

2024-10-16 15:11:58

消息隊列系統設計

2024-10-25 08:41:18

消息隊列RedisList

2022-05-14 23:49:32

Python數據計算技巧

2021-12-16 13:04:41

消息隊列緩存

2022-09-07 21:43:34

云原生存儲技術消息隊列

2025-03-12 07:55:46

2022-04-03 15:44:55

Vue.js框架設計設計與實現

2016-09-18 18:27:21

KubernetesDocker

2025-10-20 04:00:00

2019-07-24 14:49:48

SQL開源庫BI軟件

2025-11-11 09:25:19

點贊
收藏

51CTO技術棧公眾號

欧美中文字幕一区二区| √天堂8在线网| 在线欧美视频| 精品在线观看国产| 久久精品免费网站| 蜜桃av在线免费观看| 国产精品99久久久久久久女警| 久久久久久久久久久人体| 给我免费观看片在线电影的| 日韩成人av电影| 亚洲天堂精品在线观看| 国产精品免费区二区三区观看| 五月婷婷开心网| 日韩欧美网址| 亚洲高清在线观看| www.亚洲高清| 日本在线影院| 亚洲精品一二三| 欧美日韩国产一二| 国产激情无套内精对白视频| 亚洲综合激情| 久久精品久久久久久| 在线观看欧美www| 成人午夜视频免费在线观看| 国产精品va在线观看视色| 国产91丝袜在线观看| 秋霞av国产精品一区| 亚洲天堂岛国片| 国产在线一区不卡| 日本韩国一区二区| 男人添女荫道口喷水视频| 2019中文字幕在线视频| av在线播放一区二区三区| 91久久久久久久| 最新中文字幕第一页| 99伊人成综合| 久久91精品国产91久久久| 亚洲一级片在线播放| 日本妇女一区| 亚洲成年人在线| 天天爽夜夜爽视频| 久久av影院| 色狠狠一区二区三区香蕉| 国产黄色片免费在线观看| bt在线麻豆视频| 亚洲丝袜美腿综合| 五月天综合网| 青青久草在线| 91亚洲午夜精品久久久久久| 91精品婷婷国产综合久久蝌蚪| 97国产成人无码精品久久久| 美国av一区二区| 国产精品一区av| 亚洲午夜无码久久久久| 日韩不卡免费视频| 国产成人91久久精品| 青草视频在线观看免费| 国产精品社区| 7m第一福利500精品视频| 久久久久黄色片| 欧美激情视频一区二区三区在线播放| 日韩在线小视频| 欧美成人短视频| 四虎成人精品永久免费av九九| 亚洲一区二区久久久| 中文字幕av久久爽一区| 凹凸成人精品亚洲精品密奴| 中文字幕欧美在线| 欧美风情第一页| 亚洲激情中文在线| 欧美激情伊人电影| 五月天婷婷丁香| 香蕉久久国产| 国产精品久久久久久久久久久久| 91福利免费视频| 国产精品一区二区三区网站| 成人黄色片视频网站| 精品国产av一区二区| 成人免费视频免费观看| 久久久精彩视频| 国产在线视频网| 国产精品电影一区二区三区| 欧美少妇一区二区三区| 黄页网站在线| 色综合久久久久综合体桃花网| 免费黄色特级片| 日韩色性视频| 日韩免费观看高清完整版 | 欧美午夜影院| 高清一区二区三区日本久| av黄色在线播放| 久久99国产精品免费网站| 97视频中文字幕| 你懂的在线网址| 国产精品久久久久9999吃药| 久操手机在线视频| 中日韩脚交footjobhd| 欧美天堂一区二区三区| 熟妇无码乱子成人精品| 久久久亚洲欧洲日产| 尤物九九久久国产精品的特点| 日韩一区二区三区四区在线| 免费中文字幕日韩欧美| 亚洲最大成人网色| 日韩av成人| 亚洲情趣在线观看| 国产成人a亚洲精v品无码| 日韩护士脚交太爽了| 亚洲国语精品自产拍在线观看| 女人十八毛片嫩草av| 影音先锋久久精品| 国产欧美一区二区三区视频 | 国产成人午夜视频| 久久婷婷国产综合尤物精品| 成人无遮挡免费网站视频在线观看| 午夜激情一区二区三区| 天堂中文av在线| 在线看成人短视频| 色综合久久悠悠| 7777久久亚洲中文字幕| 久久精品亚洲精品国产欧美kt∨| 国产欧美久久久久| 91成人app| 亚洲欧洲在线播放| 日韩成人高清视频| 国产精品18久久久久久久久久久久| 日韩电影大全在线观看| 波多野结衣视频一区二区| 欧美一区日韩一区| 蜜臀久久99精品久久久久久| 99精品热6080yy久久| 99国产在线视频| 激情成人四房播| 欧美吻胸吃奶大尺度电影| 亚洲精品乱码久久| 红桃视频亚洲| 97人人干人人| 亚洲淫性视频| 欧美一级日韩免费不卡| 手机免费观看av| 久久久蜜桃一区二区人| 久久精品成人一区二区三区蜜臀| 丝袜在线观看| 日韩免费一区二区三区在线播放| 国产免费美女视频| 精品一区二区综合| 在线免费观看成人网| 久久亚洲人体| 日韩中文字幕第一页| 中文字字幕在线中文乱码| 国产日韩av一区二区| 免费在线观看毛片网站| 国产一区二区区别| 国产精品成人观看视频国产奇米| 春暖花开成人亚洲区| 日本高清不卡在线观看| 人人人妻人人澡人人爽欧美一区| 水蜜桃久久夜色精品一区的特点| 欧美一级爽aaaaa大片| 亚洲国产欧美日本视频| 亚洲欧美一区二区三区四区| 91久久国产综合久久91| 欧美经典一区二区三区| 中文字幕永久有效| 欧美激情aⅴ一区二区三区| **亚洲第一综合导航网站| 午夜伦理大片视频在线观看| 欧美成人艳星乳罩| 精品成人久久久| 久久美女艺术照精彩视频福利播放| 成年人黄色片视频| 色男人天堂综合再现| 91久久久久久久久久久| 欧美xxx黑人xxx水蜜桃| 亚洲精品久久久一区二区三区 | 中文字幕免费一区二区| av一区二区三区免费| 激情黄产视频在线免费观看| 亚洲欧洲在线视频| 国产精品久久欧美久久一区| 一区二区在线观看视频在线观看| 国产精品果冻传媒| 视频精品一区二区| 天堂v在线视频| 国产精品自在| 国产精品久久久久福利| av电影高清在线观看| 亚洲精品国产品国语在线| 免费黄色一级大片| 亚洲精品乱码久久久久久 | 粉嫩精品导航导航| 日韩免费在线观看视频| 麻豆网站视频在线观看| 亚洲国产欧美一区二区丝袜黑人| 91porny九色| 亚洲激情校园春色| 少妇无套高潮一二三区| 高清在线观看日韩| 亚洲精品一二三四五区| 欧美午夜不卡影院在线观看完整版免费| 久久手机视频| 亚洲精品v亚洲精品v日韩精品| 日韩av免费一区| 羞羞网站在线看| 在线看欧美日韩| 亚洲国产精品无码久久| 欧美中文字幕一区二区三区| 久热精品在线观看| 国产精品免费丝袜| 亚洲欧美色图视频| 国产精品99久久久| 亚洲成人av免费看| 99国产精品| 粉嫩av一区二区三区天美传媒 | 久久精品一区二区三| 欧美国产一区视频在线观看| 182在线视频| 国产一区二区三区四区在线观看| 国产欧美在线一区| 很黄很黄激情成人| 永久免费精品视频网站| 欧美热在线视频精品999| 国产一区二区不卡视频| 国产精品xnxxcom| 国产精品久久久久7777婷婷| 激情国产在线| 国语自产精品视频在免费| 黄色在线免费| 日韩在线中文字幕| 国产三级视频在线| 国产婷婷97碰碰久久人人蜜臀 | 国产树林野战在线播放| 精品一区二区三区的国产在线观看| 国产福利久久| 视频一区日韩精品| 亚洲一区二区三区久久| 欧美美女被草| 国产精品一区久久| 外国电影一区二区| 国产精品免费福利| 成人视屏在线观看| 国产精品jvid在线观看蜜臀| 悠悠资源网亚洲青| 97成人精品区在线播放| 69av成人| 97超级碰碰人国产在线观看| 98色花堂精品视频在线观看| 欧美激情视频免费观看| 女同一区二区免费aⅴ| 精品中文字幕在线2019| 性欧美高清come| 美女av一区二区三区 | 亚洲黄页网站| 久久久水蜜桃| 婷婷精品在线观看| 免费精品视频一区| 精品国产一区二区三区香蕉沈先生| 欧洲精品码一区二区三区免费看| 国产精品一区2区3区| 翔田千里亚洲一二三区| 色小子综合网| 久久免费一级片| 精品二区视频| 春日野结衣av| 奇米在线7777在线精品| 色噜噜狠狠永久免费| 国产综合色视频| 亚洲av无一区二区三区久久| 成人免费视频视频| 性久久久久久久久久| 国产精品网站在线播放| 天海翼在线视频| 一区二区激情视频| 国产www在线| 欧美三区在线视频| 国产wwwwwww| 亚洲国产女人aaa毛片在线| 欧美男男激情freegay| 综合激情国产一区| 色老头在线观看| 欧美一级视频在线观看| 日韩制服诱惑| 产国精品偷在线| 久久综合影院| 国产在线拍揄自揄拍无码| 99精品99| av中文字幕网址| 99国产欧美久久久精品| 国产白丝一区二区三区| 一区av在线播放| 国产99久久久久久免费看| 91精品国产一区二区三区蜜臀 | 伊人精品在线观看| av网址在线免费观看| 2019中文字幕在线| 96视频在线观看欧美| 黄色一区三区| 99视频精品全部免费在线视频| 日本中文字幕亚洲| 麻豆国产欧美一区二区三区| 国产精品手机在线观看| 欧美激情一区二区三区| 日韩乱码一区二区| 337p亚洲精品色噜噜噜| 色播色播色播色播色播在线 | 日韩高清不卡一区二区三区| 能看毛片的网站| 日本一区二区三区在线观看| 国产真实乱人偷精品视频| 欧美日韩成人综合| 日本在线丨区| 欧美日韩成人网| 久久精品黄色| 美女主播视频一区| 国产精品地址| 在线视频日韩欧美| 中文字幕高清不卡| 亚洲欧美偷拍视频| 亚洲成人av在线播放| 国产区在线看| 国产精品视频色| 亚洲欧美tv| 水蜜桃色314在线观看| 久久99精品网久久| 久久久精品成人| 精品久久香蕉国产线看观看亚洲| 国产激情无套内精对白视频| 日韩视频亚洲视频| 精品日本视频| 秋霞久久久久久一区二区| 亚洲激情偷拍| 国产精品无码自拍| 亚洲另类春色国产| 国产人妻精品一区二区三| 夜夜嗨av一区二区三区免费区| 天堂中文在线播放| 精品国产一区二区三| 亚洲大胆在线| 怡红院一区二区| 亚洲一区二区三区免费视频| 精品国产亚洲AV| 欧美日韩福利电影| 久久久国产精品入口麻豆| 国产一区一区三区| 激情综合色播五月| 成人免费毛片xxx| 678五月天丁香亚洲综合网| 黄在线免费看| 99视频国产精品免费观看| 一级毛片免费高清中文字幕久久网| 91亚洲精品久久久蜜桃借种| 国产精品久久久久久久久久免费看 | 亚欧美在线观看| 欧美国产精品一区| 中文字幕一区二区三区四区免费看| 国产亚洲激情在线| 国产精品麻豆成人av电影艾秋| 亚洲欧美日韩精品久久久 | 五月激情六月综合| 午夜性色福利视频| 欧洲成人在线视频| 国产精品亚洲人成在99www| 美女一区二区三区视频| 国产精品美女久久久久久久久久久 | 国产一级免费片| 欧美视频精品一区| 国产免费av在线| 国产综合久久久久| 中文字幕乱码亚洲无线精品一区| 中文字幕视频观看| 欧美日韩亚洲高清| 国产一二在线观看| 成人久久一区二区三区| 亚洲欧美亚洲| a级在线观看视频| 欧美日韩卡一卡二| 手机在线免费看av| 久久综合入口| 久久精品国产亚洲aⅴ| 九九免费精品视频| 亚洲精品理论电影| 国产精品.xx视频.xxtv| 日本xxxxx18| 99久久精品费精品国产一区二区| 中文字幕黄色片| www日韩欧美| 久久超级碰碰| 91av俱乐部| 亚洲男帅同性gay1069| 婷婷五月综合激情| 国产主播精品在线| 亚洲黄网站黄| 成人免费视频入口| 亚洲成人中文字幕| 成人免费在线观看视频| 丁香色欲久久久久久综合网| 久久网站热最新地址| 国产成人精品一区二三区四区五区| 欧美在线视频一区二区| 亚洲经典一区|