SpringBoot + ResponseBodyEmitter 實時異步流式推送,優雅!
兄弟們,今天咱們來聊個挺酷的技術 —— 用 SpringBoot 結合 ResponseBodyEmitter 搞實時異步流式推送。估計不少人看到 “流式推送” 這四個字,腦子里已經開始浮現各種復雜的場景了,比如實時聊天、數據監控啥的。別急,咱們今天就用大白話,配上點小幽默,把這玩意兒給盤明白。
先來說說為啥需要這東西。咱們平時寫的 Web 接口,大多是 “一問一答” 的模式。客戶端發個請求,服務器哐哐哐處理完,打包成一個響應扔回去,完事。這就好比你去便利店買瓶可樂,付了錢拿了貨,轉身就走,整個過程干凈利落。
但有些場景就不一樣了。比如說,你在看直播的時候,主播那邊的畫面和聲音是一幀一幀、一秒一秒不斷傳過來的,總不能等直播結束了,一次性給你整個視頻文件吧?再比如,你在用某個數據分析工具,想實時看到數據處理的進度,總不能每隔兩秒就手動刷新一次頁面吧?這時候,“流式推送” 就派上用場了 —— 服務器可以像流水一樣,一點一點把數據推給客戶端,不用等所有事情都做完。
在 ResponseBodyEmitter 出現之前,咱們想搞這種實時推送,要么用 WebSocket,要么用輪詢。WebSocket 是個好東西,全雙工通信,但是配置起來有點麻煩,對于一些簡單的場景來說,有點 “殺雞用牛刀” 的感覺。輪詢就更別說了,客戶端傻乎乎地每隔一段時間就問服務器 “有新數據嗎?有新數據嗎?”,不僅效率低,還浪費資源,服務器估計都想拉黑這種客戶端。
那 ResponseBodyEmitter 是啥呢?簡單說,它就是 SpringBoot 提供的一個工具,能讓服務器一邊處理數據,一邊把處理好的部分一點點推給客戶端,不用等全部處理完。就像你點了一份小龍蝦,廚師不是等所有蝦都做好了才端上來,而是炒好一盤先給你端一盤,讓你先吃著,他繼續炒下一盤。這體驗,是不是比干等著強多了?
咱們先來看個簡單的例子,感受一下 ResponseBodyEmitter 的用法。
首先,得在 SpringBoot 項目里引入 Web 依賴,這個不用多說,搞 SpringBoot 的都懂。然后,寫一個 Controller:
@RestController
public class EmitterController {
@GetMapping("/stream")
public ResponseBodyEmitter streamData() {
// 創建一個ResponseBodyEmitter對象,這里可以設置超時時間,比如30秒
ResponseBodyEmitter emitter = new ResponseBodyEmitter(30000L);
// 開個線程處理數據,模擬耗時操作
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
// 每隔1秒推送一次數據
Thread.sleep(1000);
// 推送數據,這里可以是字符串、對象等
emitter.send("這是第" + (i + 1)次推送的數據");
}
// 推送完成,關閉emitter
emitter.complete();
} catch (Exception e) {
// 發生異常時,通知客戶端
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}就這么幾行代碼,一個簡單的流式推送接口就搞定了。客戶端請求/stream接口后,不會馬上收到一個完整的響應,而是會每隔 1 秒收到一段數據,一共收到 5 段,最后連接關閉。是不是很簡單?可能有兄弟會問,這玩意兒是怎么實現的呢?其實原理也不復雜。當服務器返回 ResponseBodyEmitter 對象時,Spring 并不會馬上把響應發給客戶端,而是會保持這個連接。然后,當我們調用 emitter.send () 方法時,Spring 就會把數據一點點寫到響應流里,客戶端就能實時收到了。直到調用 emitter.complete (),或者發生異常調用 emitter.completeWithError (),這個連接才會被關閉。
這里有個小細節,就是 ResponseBodyEmitter 的超時時間。如果服務器在規定時間內沒做任何操作(既沒 send 數據,也沒 complete),這個連接就會超時關閉。所以在實際使用中,得根據業務場景合理設置超時時間,別太短也別太長。太短了,可能數據還沒推完就斷了;太長了,會占用服務器連接資源。
咱們再來說說客戶端怎么接收這些流式數據。如果是瀏覽器端,可以用 JavaScript 的 Fetch API 或者 XMLHttpRequest 來接收。比如用 Fetch:
fetch('/stream')
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function read() {
return reader.read().then(({ done, value }) => {
if (done) {
console.log('推送完成');
return;
}
// 解碼收到的二進制數據為字符串
const data = decoder.decode(value, { stream: true });
console.log('收到數據:', data);
return read();
});
}
return read();
})
.catch(error => {
console.error('發生錯誤:', error);
});這樣,瀏覽器控制臺就會每隔 1 秒打印出服務器推送的數據,體驗還是挺不錯的。不過,上面那個例子還是太簡單了,實際項目中用起來,還得考慮不少問題。比如說,怎么管理多個 Emitter 實例?如果一個客戶端打開了多個頁面,或者多個客戶端同時連接,總不能每個請求都 new 一個 Emitter 吧?這時候,咱們就需要一個 Emitter 的管理器了。
咱們可以搞一個 EmitterManager 類,用來保存和管理所有的 Emitter 實例:
@Component
public class EmitterManager {
// 用一個Map來保存Emitter,key可以是用戶標識,value是對應的Emitter
private final Map<String, ResponseBodyEmitter> emitters = new ConcurrentHashMap<>();
// 添加Emitter
public void addEmitter(String userId, ResponseBodyEmitter emitter) {
// 當Emitter完成或出錯時,自動從Map中移除
emitter.onCompletion(() -> emitters.remove(userId));
emitter.onError(e -> emitters.remove(userId));
emitters.put(userId, emitter);
}
// 根據用戶標識獲取Emitter
public ResponseBodyEmitter getEmitter(String userId) {
return emitters.get(userId);
}
// 移除Emitter
public void removeEmitter(String userId) {
emitters.remove(userId);
}
// 向所有用戶推送數據
public void sendToAll(Object data) throws IOException {
for (ResponseBodyEmitter emitter : emitters.values()) {
emitter.send(data);
}
}
// 向指定用戶推送數據
public void sendToUser(String userId, Object data) throws IOException {
ResponseBodyEmitter emitter = emitters.get(userId);
if (emitter != null) {
emitter.send(data);
}
}
}有了這個管理器,咱們就可以在 Controller 里根據用戶標識來管理 Emitter 了。比如,用戶登錄后,建立一個 Emitter 連接,服務器可以向這個用戶單獨推送數據,也可以向所有用戶廣播數據。
@RestController
public class UserEmitterController {
@Autowired
private EmitterManager emitterManager;
@GetMapping("/user/stream/{userId}")
public ResponseBodyEmitter userStream(@PathVariable String userId) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter(30000L);
emitterManager.addEmitter(userId, emitter);
return emitter;
}
@PostMapping("/send/to/user")
public String sendToUser(@RequestParam String userId, @RequestParam String message) {
try {
emitterManager.sendToUser(userId, message);
return "發送成功";
} catch (IOException e) {
return "發送失敗:" + e.getMessage();
}
}
@PostMapping("/send/to/all")
public String sendToAll(@RequestParam String message) {
try {
emitterManager.sendToAll(message);
return "發送成功";
} catch (IOException e) {
return "發送失敗:" + e.getMessage();
}
}
}這樣一來,就實現了針對用戶的實時推送功能。比如在一個在線聊天系統里,A 用戶給 B 用戶發消息,服務器就可以通過 B 用戶的 Emitter 把消息推過去;如果有系統公告,就可以通過 sendToAll 推給所有在線用戶。不過,這里有個潛在的問題:如果用戶長時間不操作,Emitter 超時關閉了,這時候再給這個用戶推送消息,就會失敗。所以,咱們得想個辦法,讓客戶端在連接超時前,自動重新建立連接。
客戶端可以這么搞:當檢測到連接關閉后,隔一小會兒就重新發起請求,建立新的 Emitter 連接。比如在 JavaScript 里:
function connect(userId) {
fetch(`/user/stream/${userId}`)
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function read() {
return reader.read().then(({ done, value }) => {
if (done) {
console.log('連接關閉,重新連接...');
// 連接關閉后,1秒后重新連接
setTimeout(() => connect(userId), 1000);
return;
}
const data = decoder.decode(value, { stream: true });
console.log('收到消息:', data);
return read();
});
}
return read();
})
.catch(error => {
console.error('連接出錯,重新連接...', error);
setTimeout(() => connect(userId), 1000);
});
}
// 頁面加載時,建立連接
connect('當前登錄用戶的ID');這樣就能保證客戶端和服務器之間一直有一個活躍的連接了。再來說說數據格式的問題。上面的例子里,咱們推送的都是字符串,實際項目中,可能需要推送 JSON 格式的數據。沒關系,ResponseBodyEmitter 支持發送各種類型的對象,Spring 會自動幫我們進行序列化。
比如,咱們定義一個消息類:
public class Message {
private String type; // 消息類型
private String content; // 消息內容
private long timestamp; // 時間戳
// 構造方法、getter、setter省略
}然后在發送的時候,直接 send 這個對象就行:
Message message = new Message();
message.setType("chat");
message.setContent("你好啊,老鐵");
message.setTimestamp(System.currentTimeMillis());
emitter.send(message);Spring 會默認把它序列化成 JSON 格式的字符串推送給客戶端,客戶端收到后再解析成 JSON 對象就行,非常方便。可能有細心的兄弟發現了,咱們在發送數據的時候,用的是 emitter.send () 方法,那這個方法是線程安全的嗎?答案是:是的。ResponseBodyEmitter 的 send () 方法是線程安全的,所以我們可以在多個線程里同時調用 send () 方法推送數據,不用擔心并發問題。這一點還是挺貼心的,省得咱們自己加鎖了。
不過,雖然 send () 方法是線程安全的,但也不能瞎用。如果推送的數據量特別大,或者推送頻率特別高,還是可能會造成性能問題。這時候,咱們可能需要考慮用線程池來管理發送線程,避免創建過多的線程。
比如,可以在 EmitterManager 里注入一個線程池:
@Component
public class EmitterManager {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
// 其他代碼省略...
public void sendToUser(String userId, Object data) {
ResponseBodyEmitter emitter = emitters.get(userId);
if (emitter != null) {
executor.submit(() -> {
try {
emitter.send(data);
} catch (IOException e) {
// 處理異常
emitter.completeWithError(e);
emitters.remove(userId);
}
});
}
}
}這樣,發送數據的操作就會交給線程池里的線程去處理,不會阻塞主線程,也能控制并發量。另外,咱們還得考慮一個問題:如果服務器重啟了,或者 Emitter 因為某些原因意外關閉了,客戶端怎么知道?除了客戶端自動重連之外,服務器也可以在推送一些 “心跳” 數據,告訴客戶端 “我還活著”。比如,每隔一段時間,給所有客戶端推送一個空消息或者特定格式的心跳消息。
@Component
publicclass HeartbeatTask {
@Autowired
private EmitterManager emitterManager;
@Scheduled(fixedRate = 10000) // 每10秒執行一次
public void sendHeartbeat() {
try {
Message heartbeat = new Message();
heartbeat.setType("heartbeat");
heartbeat.setContent("");
heartbeat.setTimestamp(System.currentTimeMillis());
emitterManager.sendToAll(heartbeat);
} catch (IOException e) {
// 處理異常
}
}
}客戶端收到心跳消息后,就知道連接還是正常的,不用重新連接。如果超過一定時間沒收到心跳,就主動重新連接。說到這里,可能有兄弟會拿 ResponseBodyEmitter 和 WebSocket 做比較。其實,這倆各有各的適用場景。WebSocket 適合那種需要雙向通信、實時性要求特別高的場景,比如在線游戲、實時協作編輯等。而 ResponseBodyEmitter 更適合那種服務器單向推送數據,客戶端主要是接收的場景,比如實時數據監控、消息通知等。而且,ResponseBodyEmitter 用起來比 WebSocket 簡單多了,不需要額外的協議支持,直接在 HTTP 協議上就能玩。
還有一個和 ResponseBodyEmitter 類似的東西,叫 SseEmitter,它是 ResponseBodyEmitter 的子類,專門用來支持 Server-Sent Events(SSE)。SSE 是一種 HTML5 規范,專門用于服務器向客戶端推送事件流。如果你的客戶端是瀏覽器,用 SseEmitter 可能更合適,因為瀏覽器原生支持 SSE 的接收。
SseEmitter 的用法和 ResponseBodyEmitter 差不多,就是 send 方法可以發送 SseEventBuilder 對象,能設置事件 ID、事件類型等信息:
@GetMapping("/sse/stream")
public SseEmitter sseStream() {
SseEmitter emitter = new SseEmitter(30000L);
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
// 構建一個SSE事件
SseEmitter.SseEventBuilder event = SseEmitter.event()
.id(String.valueOf(i)) // 事件ID
.name("message") // 事件類型
.data("這是第" + (i + 1)次SSE推送的數據");
emitter.send(event);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}客戶端用瀏覽器原生的 EventSource 來接收:
const source = new EventSource('/sse/stream');
// 監聽message類型的事件
source.addEventListener('message', function(event) {
console.log('收到SSE數據:', event.data);
});
// 監聽錯誤
source.onerror = function(error) {
console.error('SSE錯誤:', error);
source.close();
};是不是也挺簡單的?所以,如果你的應用主要面向瀏覽器客戶端,SseEmitter 可能是個更好的選擇,畢竟是瀏覽器原生支持的。咱們再來說說 ResponseBodyEmitter 在實際項目中可能遇到的一些坑。
第一個坑就是超時問題。如果推送數據的時間間隔比較長,超過了設置的超時時間,Emitter 就會自動關閉。這時候,要么把超時時間設置長一點,要么在超時前發送一個心跳消息,重置超時計時器。
第二個坑是內存泄漏。如果 Emitter 關閉后沒有從管理器中移除,就會造成內存泄漏。所以,一定要在 Emitter 的 onCompletion 和 onError 回調里,把它從 Map 中刪掉,就像咱們在 EmitterManager 里做的那樣。
第三個坑是數據亂序。因為 send () 方法可以在多個線程中調用,如果多個線程同時發送數據,客戶端收到的數據可能是亂序的。如果你的業務要求數據必須有序,那就要自己想辦法保證,比如給數據加個序號,客戶端收到后再排序。
第四個坑是大數據量推送。如果一次性推送的數據量特別大,可能會導致客戶端處理不過來,或者服務器內存占用過高。這時候,可以把大數據拆分成小塊,分多次推送,客戶端收到后再拼接起來。
除了這些坑,還有一些性能優化的點可以說說。
首先,Emitter 的數量不能太多。每個 Emitter 都會占用一個服務器連接,如果并發用戶特別多,Emitter 數量就會很多,會消耗大量的服務器資源。這時候,可能需要考慮其他方案,比如結合消息隊列,或者用 WebSocket 的廣播功能。
其次,推送的數據要盡量精簡。別把沒用的字段都往客戶端推,不僅浪費帶寬,還增加了序列化和反序列化的開銷。
再者,可以考慮使用壓縮。如果推送的文本數據比較多,可以開啟 Gzip 壓縮,能大大減少數據傳輸量。在 SpringBoot 里,開啟 Gzip 壓縮很簡單,在 application.properties 里配置一下就行:
server.compression.enabled=true
server.compression.mime-types=application/json,application/xml,text/html,text/plain最后,要做好監控和告警。比如監控 Emitter 的數量、發送數據的頻率、連接超時的次數等,一旦出現異常,及時告警,方便排查問題。咱們再來舉個實際點的例子,看看 ResponseBodyEmitter 在項目中具體能怎么用。
假設咱們要做一個實時日志查看功能,用戶在網頁上可以實時看到服務器的日志輸出。這時候,就可以用 ResponseBodyEmitter 來實現。
服務器端可以這樣搞:
@RestController
publicclass LogController {
@Autowired
private EmitterManager emitterManager;
@GetMapping("/logs/stream/{userId}")
public ResponseBodyEmitter logStream(@PathVariableString userId) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter(60000L);
emitterManager.addEmitter(userId, emitter);
// 啟動一個線程,實時讀取日志文件,并推送給客戶端
new Thread(() -> {
try {
// 這里只是示例,實際中可以用FileTailer之類的工具實時讀取日志
BufferedReader reader = new BufferedReader(new FileReader("application.log"));
String line;
while ((line = reader.readLine()) != null) {
// 如果Emitter已經關閉,就退出循環
if (emitter.isCompleted() || emitter.isClosed()) {
break;
}
emitter.send(line);
// 稍微休眠一下,避免推送太快
Thread.sleep(100);
}
reader.close();
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}客戶端頁面上,就可以用 JavaScript 實時接收日志,并顯示在頁面上:
<div id="logContainer" style="white-space: pre;"></div>
<script>
function connectLogStream(userId) {
fetch(`/logs/stream/${userId}`)
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
const logContainer = document.getElementById('logContainer');
function read() {
return reader.read().then(({ done, value }) => {
if (done) {
console.log('日志流結束,重新連接...');
setTimeout(() => connectLogStream(userId), 1000);
return;
}
const line = decoder.decode(value, { stream: true });
// 把新的日志行添加到頁面上
logContainer.textContent += line + '\n';
// 自動滾動到底部
logContainer.scrollTop = logContainer.scrollHeight;
return read();
});
}
return read();
})
.catch(error => {
console.error('日志流出錯,重新連接...', error);
setTimeout(() => connectLogStream(userId), 1000);
});
}
// 假設當前用戶ID是123
connectLogStream('123');
</script>這樣,用戶打開頁面后,就能實時看到服務器日志不斷地在頁面上刷新,就像在服務器上用 tail -f 命令看日志一樣,體驗還是挺不錯的。再比如,在一個數據導入的場景中,用戶上傳一個大文件,服務器在后臺異步處理,同時可以通過 ResponseBodyEmitter 實時向客戶端推送處理進度,比如 “已處理 10%”、“已處理 50%”、“處理完成” 等。客戶端收到這些進度信息后,就可以在頁面上顯示一個進度條,讓用戶知道當前的處理狀態,不至于以為頁面卡了。
說了這么多,總結一下 ResponseBodyEmitter 的優點:
- 用法簡單,集成在 SpringBoot 里,不用引入額外的依賴。
- 基于 HTTP 協議,兼容性好,客戶端不用做太多特殊處理。
- 支持異步流式推送,能提升用戶體驗。
- 線程安全,多個線程可以同時發送數據。
當然,它也有一些局限性:
- 主要是服務器單向推送,客戶端不能通過這個連接向服務器發送數據(除非再發一個 HTTP 請求)。
- 并發量大的時候,會占用較多的服務器連接資源。
- 依賴 HTTP 長連接,可能會被代理服務器、防火墻等中途斷開。
所以,在選擇技術方案的時候,要根據實際的業務場景來決定。如果是簡單的實時推送場景,ResponseBodyEmitter 是個不錯的選擇,足夠優雅,也足夠好用。如果是復雜的雙向實時通信場景,可能還是 WebSocket 更合適。
最后,再給大家提幾個在實際項目中使用 ResponseBodyEmitter 的小建議:
- 做好異常處理。無論是服務器發送數據出錯,還是客戶端連接斷開,都要妥善處理,避免程序崩潰,或者出現資源泄漏。
- 合理設置超時時間。根據業務的實際情況,設置一個合適的超時時間,既保證連接的穩定性,又不會浪費服務器資源。
- 考慮斷線重連機制。客戶端和服務器都要做好斷線重連的準備,保證推送的連續性。
- 進行充分的測試。特別是在高并發場景下,要測試 Emitter 的性能和穩定性,看看它能不能扛住壓力。
- 不要過度使用。不是所有場景都需要流式推送,簡單的 “一問一答” 模式能解決的問題,就別用這么復雜的東西。
總的來說,SpringBoot 的 ResponseBodyEmitter 是一個非常實用的工具,用好了能給咱們的項目帶來很大的便利,提升用戶體驗。它不像有些技術那樣晦澀難懂,只要理解了它的基本原理和用法,上手還是挺容易的。

































