Springboot 整合 Websocket 輕松實(shí)現(xiàn) IM 及時(shí)通訊
一、方案實(shí)踐
集成分為三步:添加依賴、增加配置類和消息核心類、前端集成。
1.1、添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.1.13.RELEASE</version>
</dependency>1.2、增加WebSocket配置類
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* WebSocket配置
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}1.3、增加消息核心類WebSocketServer
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
// 消息存儲(chǔ)
private static MessageStore messageStore;
// 消息發(fā)送
private static MessageSender messageSender;
public static void setMessageStore(MessageStore messageStore) {
WebSocketServer.messageStore = messageStore;
}
public static void setMessageSender(MessageSender messageSender) {
WebSocketServer.messageSender = messageSender;
}
/**
* 連接建立成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
messageStore.saveSession(session);
}
/**
* 連接關(guān)閉調(diào)用的方法
*/
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
messageStore.deleteSession(session);
}
/**
* 收到客戶端消息后調(diào)用的方法
*
* @ Param message 客戶端發(fā)送過(guò)來(lái)的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws Exception {
log.warn("=========== 收到來(lái)自窗口" + session.getId() + "的信息:" + message);
handleTextMessage(session, new TextMessage(message));
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, @PathParam("userId") String userId, Throwable error) {
log.error("=========== 發(fā)生錯(cuò)誤");
error.printStackTrace();
// msgStore.deleteSession(session);
}
public void handleTextMessage(Session session, TextMessage message) throws Exception {
log.warn("=========== Received message: {}", message.getPayload());
}
}1.4、前端頁(yè)面加入socket
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/html">
<head>
<title>WebSocket Example</title>
</head>
<body>
登錄用戶ID:<input type="text" id="sendUserId" /></br>
接受用戶ID:<input type="text" id="receivedUserId" /></br>
發(fā)送消息內(nèi)容:<input type="text" id="messageInput" /></br>
接受消息內(nèi)容:<input type="text" id="messageReceive" /></br>
<button onclick="sendMessage()">Send</button>
<script>
var socket = new WebSocket("ws://localhost:8080/websocket/aaa");
var roomId = "123456";
// 隨機(jī)產(chǎn)出六位數(shù)字
var sendUserId = Math.floor(Math.random() * 1000000);
document.getElementById("sendUserId").value = sendUserId;
var messageReceive = document.getElementById("messageReceive");
socket.onopen = function (event) {
console.log("WebSocket is open now.");
let loginInfo = {
msgType: 2, //登錄消息
sendUserId: sendUserId,
bizType: 1, //業(yè)務(wù)類型
bizOptModule: 1, //業(yè)務(wù)模塊
roomId: roomId,
msgBody: {},
};
socket.send(JSON.stringify(loginInfo));
};
socket.onmessage = function (event) {
var message = event.data;
console.log("Received message: " + message);
messageReceive.value = message;
};
socket.onclose = function (event) {
console.log("WebSocket is closed now.");
};
function sendMessage() {
var message = document.getElementById("messageInput").value;
var receivedUserId = document.getElementById("receivedUserId").value;
let operateInfo = {
msgType: 100, //業(yè)務(wù)消息
sendUserId: sendUserId,
bizType: 1, //業(yè)務(wù)類型
bizOptModule: 1, //業(yè)務(wù)模塊
roomId: roomId,
receivedUserId: receivedUserId,
msgBody: {
operateType: 1, //操作類型:禁言
operateContent: "1",
},
};
socket.send(JSON.stringify(operateInfo));
}
setInterval(() => {
socket.send("ping");
}, 30000);
</script>
</body>
</html>二、小型及時(shí)通訊包含的模塊
以上只是集成了Websocket框架,實(shí)現(xiàn)了基本的全雙工通信,服務(wù)器和客戶端都可以同時(shí)發(fā)送和接收數(shù)據(jù)。要想實(shí)現(xiàn)一些小型完整的及時(shí)通訊,還需要具備以下幾個(gè)核心模塊。架構(gòu)圖如下:
2.1、架構(gòu)圖
圖片
2.2、消息對(duì)象模型
組織消息內(nèi)容,比如消息類型、發(fā)送者用戶ID、接受者用戶ID、具體的消息體等。比如:
public class SocketMsg<T> {
/**
* 消息類型:1心跳 2登錄 3業(yè)務(wù)操作
*/
private Integer msgType;
/**
* 發(fā)送者用戶ID
*/
private String sendUserId;
/**
* 接受者用戶ID
*/
private String receivedUserId;
/**
* 業(yè)務(wù)類型
*/
private Integer bizType;
/**
* 業(yè)務(wù)操作模塊
*/
private Integer bizOptModule;
/**
* 消息內(nèi)容
*/
private T msgBody;
}2.3、消息存儲(chǔ)模塊
負(fù)責(zé)存儲(chǔ)消息內(nèi)容、用戶ID和sessionID的關(guān)系,防止數(shù)據(jù)丟失或者服務(wù)器重啟等。
2.4、消息發(fā)送模塊
功能開發(fā)完畢,一般部署到分布式集群環(huán)境,所以通訊時(shí)session會(huì)分布在多臺(tái)服務(wù)器。比如用戶A的session在機(jī)器1,用戶B的session在機(jī)器2,此時(shí)A發(fā)送給B,就無(wú)法單獨(dú)通過(guò)機(jī)器1完成。
因?yàn)闄C(jī)器1拿不到機(jī)器2里的session,所以消息發(fā)不過(guò)去。此時(shí)只能借助別的中間件來(lái)實(shí)現(xiàn),比如借助消息中間件kafka實(shí)現(xiàn)。
機(jī)器1將消息發(fā)送給kafka,然后機(jī)器1和機(jī)器2都監(jiān)聽(tīng)kafka,然后查看用戶對(duì)應(yīng)的session是否在本機(jī),如果在本機(jī)則發(fā)送出去。
2.5、消息推送模塊
模塊3提到的消息發(fā)送流程中,消息發(fā)送給 消息中間件,然后服務(wù)器消費(fèi)到消費(fèi),在通過(guò)本機(jī)的session推送給客戶端。
三、遇到的幾個(gè)問(wèn)題
3.1、連接自動(dòng)斷開
WebSocket連接之后,發(fā)現(xiàn)一個(gè)問(wèn)題:每隔一段時(shí)間如果不傳送數(shù)據(jù)的話,與前端的連接就會(huì)自動(dòng)斷開。可以采用心跳消息的方式來(lái)解決這個(gè)問(wèn)題。比如客服端每隔30秒自動(dòng)發(fā)送ping消息給服務(wù)端,服務(wù)端返回pong。
3.2、Session無(wú)法被序列化
分布式場(chǎng)景會(huì)存在這樣的問(wèn)題:當(dāng)一次請(qǐng)求負(fù)載到第一臺(tái)服務(wù)器時(shí),session在第一臺(tái)服務(wù)器線程上,第二次請(qǐng)求,負(fù)載到第二臺(tái)服務(wù)器上,此時(shí)通過(guò)userId查找當(dāng)前用戶的session時(shí),是查找不到的。
本來(lái)想著把session存入到redis中,就可以從redis獲取用戶的session,希望用這種方式來(lái)解決分布式場(chǎng)景下消息發(fā)送的問(wèn)題。但是會(huì)出現(xiàn)如下錯(cuò)誤:
The remote endpoint was in state [STREAM_WRITING] which is an invalid state for called method翻看了session源碼,發(fā)現(xiàn)session無(wú)法被序列化。所以這個(gè)方案只能放棄。解決方案請(qǐng)看下面的問(wèn)題5或者上面的架構(gòu)圖。
3.3、對(duì)象無(wú)法自動(dòng)注入
使用了@ServerEndpoint注解的類中使用@Resource或@Autowired注入對(duì)象都會(huì)失敗,并且報(bào)空指針異常。
原因是WebSocket服務(wù)是線程安全的,那么當(dāng)我們?nèi)グl(fā)起一個(gè)ws連接時(shí),就會(huì)創(chuàng)建一個(gè)端點(diǎn)對(duì)象。WebSocket服務(wù)是多對(duì)象的,不是單例的。而我們的Spring的Bean默認(rèn)就是單例的,在非單例類中注入一個(gè)單例的Bean是沖突的。
或者說(shuō):
Spring管理采用單例模式(singleton),而 WebSocket 是多對(duì)象的,即每個(gè)客戶端對(duì)應(yīng)后臺(tái)的一個(gè) WebSocket 對(duì)象,也可以理解成 new 了一個(gè) WebSocket,這樣當(dāng)然是不能獲得自動(dòng)注入的對(duì)象了,因?yàn)檫@兩者剛好沖突。
@Autowired 注解注入對(duì)象操作是在啟動(dòng)時(shí)執(zhí)行的,而不是在使用時(shí),而 WebSocket 是只有連接使用時(shí)才實(shí)例化對(duì)象,且有多個(gè)連接就有多個(gè)對(duì)象。所以我們可以得出結(jié)論,這個(gè) Service 根本就沒(méi)有注入到 WebSocket 當(dāng)中。
如何解決呢?
使用靜態(tài)對(duì)象,并且對(duì)外暴露set方法,這樣在對(duì)象初始化的時(shí)候,將其注入到WebSocketServer中。比如說(shuō)這樣:
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
private static MessageStore messageStore;
private static MessageSender messageSender;
public static void setMessageStore(MessageStore messageStore) {
WebSocketServer.messageStore = messageStore;
}
public static void setMessageSender(MessageSender messageSender) {
WebSocketServer.messageSender = messageSender;
}
}
@Slf4j
@Service
public class MessageStore {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void init() {
WebSocketServer.setMessageStore(this);
}
}3.4、分布式場(chǎng)景消息如何發(fā)給客戶端
問(wèn)題2中提到了分布式場(chǎng)景下存在的session不在本機(jī)的問(wèn)題,這種場(chǎng)景可以通過(guò)發(fā)送消息中間件的方式解決。具體這樣解決:
每次連接時(shí),都將userId和對(duì)應(yīng)的session存入到本機(jī),發(fā)送消息時(shí),直接發(fā)送給MQ-Broker,然后每臺(tái)應(yīng)用負(fù)載都去消費(fèi)這個(gè)消息,拿到消息之后,判斷在本機(jī)能根據(jù)userId是否能找到session,找到session則推送到客戶端。
3.5、部署時(shí)Nginx配置問(wèn)題
代碼開發(fā)完畢之后,本機(jī)跑通后,然后部署到服務(wù)器之后,還差很重要的一步:配置nginx代理。
3.5.1、給后端應(yīng)用部署獨(dú)立域名
要給后端應(yīng)用部署獨(dú)立域名,nginx代理直接轉(zhuǎn)發(fā)到應(yīng)用的獨(dú)立域名,不要走微服務(wù)的gateway網(wǎng)關(guān)轉(zhuǎn)發(fā)過(guò)去。
3.5.2、多層nginx轉(zhuǎn)發(fā)問(wèn)題
當(dāng)只有一層nginx的時(shí)候,配置比較簡(jiǎn)單,如下:
location ~* ^/api/websocket/* {
proxy_pass http://mangodwsstest.mangod.top;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_set_header Host mangodwsstest.mangod.top;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header X-Real-IP $remote_addr;
}但是,當(dāng)有兩層nginx轉(zhuǎn)發(fā)的時(shí)候,問(wèn)題就出現(xiàn)了。
在最外層的nginx需要使用如下配置,不能照抄后面一層的配置。proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for和proxy_set_header X-Forwarded-Proto $scheme這兩個(gè)配置不能少,用來(lái)將協(xié)議和真實(shí)IP傳遞給后面一層的nginx。
location ~* ^/api/websocket/* {
proxy_pass http://mangodwsstest.mangod.top;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}四、完整代碼和示例
4.1、頁(yè)面效果如下
開啟兩個(gè)web頁(yè)面,用戶1輸入用戶2的用戶ID,輸入發(fā)送消息內(nèi)容,點(diǎn)擊發(fā)送。在用戶2的頁(yè)面的接受消息內(nèi)容可以看到發(fā)送的消息。
圖片
圖片
4.2、代碼結(jié)構(gòu)
圖片
4.3、代碼地址
https://github.com/yclxiao/spring-websocket.git
五、總結(jié)
本文聊了Springboot如何集成Websocket、IM及時(shí)通訊需要哪些模塊、開發(fā)和部署過(guò)程中遇到的問(wèn)題、以及實(shí)現(xiàn)小型IM及時(shí)通訊的代碼。































