pringboot+webocket+vue网站实时在线人数(02) 实现步骤
需求
- 捕捉用户登录和关闭页签的动作,用于统计实时在线人数
- 系统管理员可以查看到当前实时在线人数列表
效果
实现模型
实现步骤
用户前端登录后发起socket连接
/** 获取当前用户信息 **/
getUserInfo () {
this.$http({
url: this.$http.adornUrl(/sys/user/info),
method: get,
params: this.$http.adornParams()
}).then(({data}) => {
if (data && data.code === 0) {
this.loading = false
this.userId = data.user.userId
this.userName = data.user.username
this.name = data.user.name
this.$store.commit(user/updateDept, data.user.dept)
// 注册websocket事件
this.startSocketBeats(this.userId,data.user.onlineUrl)
}
})
},
/** 创建websocket连接 **/
startSocketBeats(token,url){
if (!window[SocketNotice]) {
// 创建socket实例
window[SocketNotice] = new SocketNotice(url, token, 5)
// window[SocketNotice] = new SocketNotice("ws://127.0.0.1:8099/saleapi/notice.ws", 1290904685997350914, 5)
}
if (!window[SocketNotice].connected) {
// 发起socket连接
window[SocketNotice].connect()
}
}
socketNotice.js (webSocket相关工具类代码)
注意processMessage方法,这里用于区分接收到的消息类型,根据不同类型去处理不同的页面监听器
import UUID from uuid/v4
const socketType = {
// 心跳
HEART: {
msgType: 98
}
}
export default class SocketNotice {
/**
* @param url
* @param userId
* @param pingIntervalInSeconds 心跳间隔,单位秒
* @param maxConnectionBrokenTimes 最大的断开链接失败次数,超过就重连
*/
constructor(url, userId, pingIntervalInSeconds = 5, maxConnectionBrokenTimes = 100) {
this.socket = null
this.connected = false// 是否已经联通了服务器
this.heartHeatTimer = -1// 心跳定时器
this.maxConnBrokenTimes = maxConnectionBrokenTimes// 默认的判断断线的丢包次数
this.connBrokenTimes = 0// 默认的判断断线的丢包次数
this.componentsBinded = new Map() // 存放页面组件的容器
this.heartBeatData = JSON.stringify(socketType.HEART)
this.connectUrl = url
this.pingInterval = pingIntervalInSeconds * 1000
this.wsTokenId = userId
this.stopFlag = false// 是否停止socket
this.defaultEventProcessor = {
onclose: this.onClose.bind(this),
onopen: this.onOpen.bind(this),
onmessage: this.onMessage.bind(this),
onerror: this.onError.bind(this)
}
}
connect() {
if (WebSocket in window) {
this.socket = new WebSocket(this.connectUrl + ?onlineTokenId= + this.wsTokenId)
this.setUpEvents()
this.connected = true
} else {
this.connected = false
console.log(当前浏览器 不支持 websocket)
}
}
isConnected() {
return this.connected
}
/***
* 关闭当前socket
*/
closeSocket(msg = 未指明, stop = false) {
this.stopFlag = stop
this.socket && this.socket.readyState === 1 && (this.socket.close() || (this.socket = null))
clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)// 清除定时器
if (stop) {
console.log(`由于 < ${msg} > 的原因客户端主动关闭远程连接,请联系管理员`)
}
}
stop() {
this.stopFlag = true
}
/**
* 注册事件消费者
* UUID(receiver) 生成一个理论上不重复的128位16进制表示的数字
* @param receiver
*/
registerReceiver(receiver, copmName) {
console.log(receiver, copmName)
//
if (!receiver || !UUID(receiver)) {
throw new Error(Illegal arguments)
}
if (this.componentsBinded.has(copmName)) {
console.error(`key <${copmName}> 组件已经存在! `)
}
this.componentsBinded.set(copmName, receiver)
}
unRegisterReceiver(copmName) {
copmName && this.componentsBinded.delete(copmName)
}
// 发送消息
send(message) {
if (this.stopFlag) return
this.socket.send(message)
}
processMessage(data) {
// console.log(`接收到 WS 消息... ${JSON.stringify(data)}`)
if (data.msgType === 1) {
this.online(data)
}
}
online(data) {
const comp = this.componentsBinded.get(ONLINE)
if (comp && !comp.compDestory && comp.onSocketMessage) {
comp.onSocketMessage(data)
}
}
beatHeart() {
if (!this.stopFlag) this.send(this.heartBeatData)
}
/**
* 设置心跳时间间隔 同时开启心跳
* @param onConnectionBroken 处理 无回应的次数超过限制的 函数
*/
startPing() {
if (this.heartHeatTimer > -1) {
clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)
}
if (this.pingInterval === -1 || !this.connected) {
return
}
this.heartHeatTimer = setInterval(this.beatHeart.bind(this), this.pingInterval)
// console.log(开始发送心跳 , 定时器id -->> + this.heartHeatTimer)
}
setUpEvents() {
Object.assign(this.socket, this.defaultEventProcessor)
}
// 连接发生错误的回调方法
onError(error) {
console.log(WebSocket连接发生错误)
console.log(error)
};
// 连接成功建立的回调方法
onOpen() {
this.stopFlag = false
console.log(WebSocket连接成功 )
this.startPing()
}
// 接收到消息的回调方法
onMessage(event) {
// console.log(`接收到socket消息 --> ${event.data} `)
this.processMessage(JSON.parse(event.data))
}
/**
* 连接关闭的回调方法
* 需要在关闭的时候就重连.
*/
onClose() {
this.connected = false
if (this.stopFlag) {
return
}
console.log(WebSocket连接关闭)
// 停止心跳
clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)
// 开始重连
this.socket = null
this.connBrokenTimes = this.connBrokenTimes + 1
setTimeout(function () {
console.log(socket断开链接 一秒钟之后,准备重试链接 ,已经尝试重连的次数: -->> + this.connBrokenTimes + , 最大次数: -> + this.maxConnBrokenTimes)
if (this.connBrokenTimes > this.maxConnBrokenTimes) {
console.log(`多次链接失败,可能是服务器出现故障`)
return
}
if (this.connect()) {
this.startPing()
}
}.bind(this), 1000)// 重连间隔时间是 一秒
}
}
后端处理用户发起的socket连接
- 监听onOpen事件,根据token获取当前登录用户详情
- 记录发起连接的是哪个用户,放入缓存
# 这部分根据具体业务自由发挥
@Slf4j
@Component
@ServerEndpoint("/online.ws")
public class OnlineWSServer {
private SysUserService userService;
private SaleDeptService deptService;
private DeptUserRelService deptUserRelService;
private static final String ONLINE_TOKEN_ID = "onlineTokenId";
/**
* 用来记录当前在线连接数。
*/
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static NutMap onlineUserPool = new NutMap();
@OnOpen
public synchronized void onOpen(Session session) {
this.userService = WsApplicationContextAware.getApplicationContext().getBean(SysUserService.class);
this.deptService = WsApplicationContextAware.getApplicationContext().getBean(SaleDeptService.class);
this.deptUserRelService = WsApplicationContextAware.getApplicationContext().getBean(DeptUserRelService.class);
addOnlineCount();
addOnlineUser(session);
}
@OnClose
public synchronized void onClose(Session session) {
log.error(JSONUtil.toJsonStr(session));
String userId = getSessionToken(session);
SysUserEntity user = userService.getById(userId);
SysUserEntity onlineUser = onlineUserPool.getAs(userId, SysUserEntity.class);
if (onlineUser != null) {
AtomicInteger onlineCount = onlineUser.getOnlineCount();
onlineCount.getAndDecrement();
if (onlineCount.get() == 0) {
onlineUserPool.remove(userId);
WebSocketSessionManager.remove(userId, session);
}
}
reduceOnlineCount();
}
@OnMessage
public void onMessage(String message, Session session) {
// log.info("接收到ws=[{}]的消息:{}", session, message);
JSONObject jsonObject = JSONObject.parseObject(message);
int msgType = jsonObject.getIntValue("msgType");
if (msgType == 98) {
NutMap pong = new NutMap();
pong.put("time", new Date());
pong.put("onlineCount", onlineCount.intValue());
pong.put("msgType", msgType);
try {
// 发送实时人数到当前登录用户
Set<Map.Entry<String, Object>> entries = onlineUserPool.entrySet();
List<SysUserEntity> sysUserEntityList = new ArrayList<>();
for (Map.Entry<String, Object> entry : entries) {
SysUserEntity user = (SysUserEntity) entry.getValue();
sysUserEntityList.add(user);
}
pong.put("userList", sysUserEntityList);
pong.put("onlineUserPool", onlineUserPool);
pong.put("msgType", 1);
session.getBasicRemote().sendText(JSON.toJSONString(pong));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("ws= < " + session.getId() + " > 内部错误", error);
}
private String getSessionToken(Session session) {
List<String> id = session.getRequestParameterMap().get(ONLINE_TOKEN_ID);
return id.stream().findFirst().orElse(StrUtil.EMPTY);
}
private String getDeptFullName(String itcode){
List<SaleDeptEntity> deptList = deptUserRelService.getDeptListByUsernameNotExpired(itcode);
if (CollectionUtil.isNotEmpty(deptList)){
List<String> deptNames = deptList.stream().map(SaleDeptEntity::getDeptFullName).filter(Objects::nonNull).collect(Collectors.toList());
return Joiner.on(",").join(deptNames);
}else{
return "";
}
}
public void addOnlineUser(Session session) {
String userId = getSessionToken(session);
SysUserEntity user = userService.getById(userId);
if(user==null){
return;
}
String deptFullName = getDeptFullName(user.getUsername());
if (BeanUtil.isNotEmpty(onlineUserPool.get(userId))) {
SysUserEntity userEntity = (SysUserEntity) onlineUserPool.get(userId);
userEntity.setDeptFullNames(deptFullName);
userEntity.getOnlineCount().incrementAndGet();
onlineUserPool.put(userId, userEntity);
} else {
user.setDeptFullNames(deptFullName);
user.getOnlineCount().incrementAndGet();
onlineUserPool.put(userId, user);
}
}
/**
* 原子性操作,在线连接数加一
*/
public void addOnlineCount() {
onlineCount.getAndIncrement();
}
/**
* 原子性操作,在线连接数减一
*/
public static void reduceOnlineCount() {
onlineCount.getAndDecrement();
}
}
演示连接效果
接收到msgType=98的心跳请求,反馈msgType=1的在线用户信息
下面被划掉的地方其实是不存在的,我测试时多发送了一次
目前已经能实时地拿到用户信息,但是这些数据目前是没有被任何页面处理的,我们的需求是当管理员打开实时监控页面时,才去处理并展示这些数据,接下来要做的就是,在打开实时监控页面时,把该页面的实例添加到socket容器中,也就是注册,在页面关闭时将该实例在容器中移除.
前端注册socket监听
<template>
<div>
<el-table
:data="tableData"
border
style="width: 100%">
<el-table-column
prop=""
align="center"
label="当前登录用户"
width="auto">
<el-table-column
align="center"
type="index"
width="auto">
</el-table-column>
<el-table-column
fixed
align="center"
prop="username"
label="ITCODE"
width="auto">
</el-table-column>
<el-table-column
fixed
align="center"
prop="name"
label="姓名"
width="auto">
</el-table-column>
<el-table-column
fixed
align="center"
prop="deptFullNames"
label="部门"
width="auto">
</el-table-column>
<el-table-column
fixed
align="center"
prop="onlineCount"
label="会话个数"
width="auto">
</el-table-column>
</el-table-column>
</el-table>
</div>
</template>
<script>
const COMP_NAME = ONLINE
export default {
name: index,
created() {
this.registerWebSocket(true)
},
destroyed() {
this.registerWebSocket(false)
},
data() {
return {
tableData: []
}
},
methods: {
registerWebSocket(open) {
if (open) {
if (window[SocketNotice] && window[SocketNotice].registerReceiver) {
window[SocketNotice].registerReceiver(this, COMP_NAME)
}
} else {
if (window[SocketNotice] && window[SocketNotice].unRegisterReceiver) {
window[SocketNotice].unRegisterReceiver(COMP_NAME)
}
}
},
onSocketMessage(data) {
this.tableData = data.userList;
}
}
}
</script>
<style scoped>
</style>
最终效果
总结
总的流程大概就是以下几点
- 客户端在用户登录后发起websocket连接
- 后端监听到有连接加入时就对用户信息进行记录,做相应的处理并进行反馈
- 客户端对指定页面和指定的消息类型进行监听和处理