高并發!Spring Boot 響應式 SSE 實時推送,單機吞吐量10萬+
環境:SpringBoot3.4.2
1. 簡介
在如今互聯網應用中,實時消息推送的需求日益增長,例如股票行情、在線聊天室消息更新等場景。Spring Boot 響應式 SSE(Server-Sent Events)技術為此提供了高效解決方案。
SSE 允許服務器通過一個持久化的 HTTP 連接,將數據實時推送給客戶端,無需客戶端頻繁輪詢。在 Spring Boot 響應式編程模型下,結合 WebFlux,能充分發揮 SSE 的優勢,輕松應對高并發場景。
要基于WebFlux 響應式技術通過SSE實現實時廣播消息,我們首先需要了解一個核心組件Sink。
什么是 Sinks?
Sinks 既是發布者(publisher),又是訂閱者(subscriber)。多個數據流可以通過一端發送數據,而另一端則像一個 Flux,訂閱者可以在其中觀察元素。
Sinks 的關鍵特性:
- 發布者(Publisher):該對象允許使用 emit 或 tryEmit 方法將數據推送到數據流中。
- 訂閱者(Subscriber):Sink 中的數據可以通過 asFlux() 或 asMono() 方法傳播給訂閱者。
Sinks 如何工作?
- 生成數據:調用 tryEmitNext()、tryEmitComplete() 或 tryEmitError() 等方法,用于發送數據、完成數據流或報告錯誤。
- 消費數據:訂閱者使用 asFlux() 或 asMono() 連接到 Sink,并在數據到達時接收數據。
Sinks 的類型
Sinks 種類繁多,適用于不同的使用場景:
- Sinks.Many:允許發送多個元素,有以下幾種選項:
unicast() —— 僅向一個訂閱者提供數據
multicast() —— 允許數據同時傳遞給多個訂閱者
replay() —— 新訂閱者會接收到在他們訂閱之前發送的最新元素。
- Sinks.One:用于發送單個元素。
如下 Sink 工作原理:
圖片
了解了這最核心的Sink組件后,接下來,我們將通過一個完整的示例演示。
2.實戰案例
2.1 準備環境
定義接收消息的對象
public class Message {
private Integer id ;
private String author ;
private String time ;
private String message ;
// getters, setters
}2.2 定義Sink
根據上面的介紹,定義Sink實現消息的發布及訂閱,所有訂閱者都可以通過該Sink獲取實時最新的消息。
@Configuration
public class SinkConfig {
@Bean
Sinks.Many<Message> sink() {
return Sinks.many().replay().limit(1) ;
}
}解釋:
Sinks.Many: 這是一個能夠處理多個 Message 類型元素的接收器(Sink)- replay().limit: 確保新訂閱者在連接時能夠獲取到已發布的最后一項數據。這對于那些希望立即獲取最新數據的新訂閱者來說非常有意義。
2.3 消息訂閱/發布
@Service
public class MessageService {
private final Sinks.Many<Message> messageSink ;
public MessageService(Many<Message> messageSink) {
this.messageSink = messageSink;
}
public Mono<Message> saveMessage(Mono<Message> message) {
return message.doOnNext(messageSink::tryEmitNext) ;
}
public Flux<Message> messageStream() {
return messageSink.asFlux() ;
}
}解釋:
- tryEmitNext: 嘗試發送一個非空元素,生成一個 onNext 信號。此次嘗試的結果會以 EmitResult 的形式表示,該結果可能指示出錯誤情況。
- asFlux: 返回此 Sink 的一個 Flux 視圖。每次調用都會返回同一個實例。
2.4 Controller接口
@RestController
@RequestMapping("/messages")
public class MessageController {
private final AtomicInteger count = new AtomicInteger() ;
private final MessageService messageService;
public MessageController(MessageService messageService) {
this.messageService = messageService;
}
@GetMapping("/send")
public Mono<Message> sendMessage(String message) {
String time = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) ;
Message msg = new Message(count.incrementAndGet(), "Pack", time, message) ;
return messageService.saveMessage(Mono.just(msg)) ;
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message> messageStream() {
return messageService.messageStream() ;
}
}以上我們就完成了后端接口的所有開發工作。接下來,我們實現前端頁面及功能。
2.5 前端頁面
HTML部分
<div class="container">
<h1>實時消息</h1>
<div class="controls">
<div class="buttons">
<button id="startBtn">實時監聽消息</button>
<button id="stopBtn" disabled>停止消息監聽</button>
</div>
</div>
<table>
<thead>
<tr>
<th>編號</th>
<th>作者</th>
<th>時間</th>
<th>內容</th>
</tr>
</thead>
<tbody id="messages"></tbody>
</table>
</div>CSS樣式
body {font-family: 'Roboto', sans-serif;margin: 0;background-color: #f5f5f5;display: flex;justify-content: center;align-items: flex-start;padding: 20px;}
.container {width: 90%;max-width: 1000px;margin: 0 auto;text-align: center;}
h1 {font-size: 2rem;font-weight: 500;color: #4285f4;margin-bottom: 20px;}
.controls {display: flex;justify-content: space-between;margin-bottom: 20px;}
.buttons {display: flex;gap: 10px;}
button {padding: 8px 16px;background-color: #4285f4;color: white;border: none;border-radius: 5px;font-size: 1rem;cursor: pointer;}
button:disabled {background-color: #ccc;cursor: not-allowed;}
table {width: 100%;border-collapse: collapse;background-color: white;box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);border-radius: 8px;overflow: hidden;}
th, td {padding: 15px;text-align: left;border-bottom: 1px solid #ddd;}
th {background-color: #f1f1f1;font-weight: 500;color: #333;}
tr:hover {background-color: #f9f9f9;}JavaScript
<script>
let eventSource;
const messageContainer = document.getElementById('messages');
const startBtn = document.getElementById('startBtn');
const stopBtn = document.getElementById('stopBtn');
function startStream() {
const url = `http://localhost:8080/messages/stream`;
eventSource = new EventSource(url);
eventSource.onmessage = event => {
const data = JSON.parse(event.data);
if (data) {
const message = `
<tr>
<td class="${data.type}">${data.id}</td>
<td>${data.author}</td>
<td>${data.time}</td>
<td>${data.message}</td>
</tr>
`;
messageContainer.insertAdjacentHTML('afterbegin', message);
}
};
startBtn.disabled = true;
stopBtn.disabled = false;
}
function stopStream() {
if (eventSource) {
eventSource.close();
startBtn.disabled = false;
stopBtn.disabled = true;
}
}
startBtn.addEventListener('click', startStream);
stopBtn.addEventListener('click', stopStream);
</script>最終頁面效果如下:
圖片
2.6 測試
下面最終展示效果:
圖片

































