pringboot+webocket+vue网站实时在线人数(02) 实现步骤

Xsens动作捕捉 2023-04-16 6953

需求

  1. 捕捉用户登录和关闭页签的动作,用于统计实时在线人数
  2. 系统管理员可以查看到当前实时在线人数列表

效果

pringboot+webocket+vue网站实时在线人数(02) 实现步骤  第1张

实时人数监控

实现模型

pringboot+webocket+vue网站实时在线人数(02) 实现步骤  第2张

实现模型


  • 实现步骤

    用户前端登录后发起socket连接

    /** 获取当前用户信息 **/

    getUserInfo () {

    this.$http({

    url: this.$http.adornUrl(/sys/user/info),

    method: get,

    params: this.$http.adornParams()

    }).then(({data}) => {

    if (data && data.code === 0) {

    this.loading = false

    this.userId = data.user.userId

    this.userName = data.user.username

    this.name = data.user.name

    this.$store.commit(user/updateDept, data.user.dept)

    // 注册websocket事件

    this.startSocketBeats(this.userId,data.user.onlineUrl)

    }

    })

    },

    /** 创建websocket连接 **/

    startSocketBeats(token,url){

    if (!window[SocketNotice]) {

    // 创建socket实例

    window[SocketNotice] = new SocketNotice(url, token, 5)

    // window[SocketNotice] = new SocketNotice("ws://127.0.0.1:8099/saleapi/notice.ws", 1290904685997350914, 5)

    }

    if (!window[SocketNotice].connected) {

    // 发起socket连接

    window[SocketNotice].connect()

    }

    }

    socketNotice.js (webSocket相关工具类代码)

    注意processMessage方法,这里用于区分接收到的消息类型,根据不同类型去处理不同的页面监听器

    import UUID from uuid/v4

    const socketType = {

    // 心跳

    HEART: {

    msgType: 98

    }

    }

    export default class SocketNotice {

    /**

    * @param url

    * @param userId

    * @param pingIntervalInSeconds 心跳间隔,单位秒

    * @param maxConnectionBrokenTimes 最大的断开链接失败次数,超过就重连

    */

    constructor(url, userId, pingIntervalInSeconds = 5, maxConnectionBrokenTimes = 100) {

    this.socket = null

    this.connected = false// 是否已经联通了服务器

    this.heartHeatTimer = -1// 心跳定时器

    this.maxConnBrokenTimes = maxConnectionBrokenTimes// 默认的判断断线的丢包次数

    this.connBrokenTimes = 0// 默认的判断断线的丢包次数

    this.componentsBinded = new Map() // 存放页面组件的容器

    this.heartBeatData = JSON.stringify(socketType.HEART)

    this.connectUrl = url

    this.pingInterval = pingIntervalInSeconds * 1000

    this.wsTokenId = userId

    this.stopFlag = false// 是否停止socket

    this.defaultEventProcessor = {

    onclose: this.onClose.bind(this),

    onopen: this.onOpen.bind(this),

    onmessage: this.onMessage.bind(this),

    onerror: this.onError.bind(this)

    }

    }

    connect() {

    if (WebSocket in window) {

    this.socket = new WebSocket(this.connectUrl + ?onlineTokenId= + this.wsTokenId)

    this.setUpEvents()

    this.connected = true

    } else {

    this.connected = false

    console.log(当前浏览器 不支持 websocket)

    }

    }

    isConnected() {

    return this.connected

    }

    /***

    * 关闭当前socket

    */

    closeSocket(msg = 未指明, stop = false) {

    this.stopFlag = stop

    this.socket && this.socket.readyState === 1 && (this.socket.close() || (this.socket = null))

    clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)// 清除定时器

    if (stop) {

    console.log(`由于 < ${msg} > 的原因客户端主动关闭远程连接,请联系管理员`)

    }

    }

    stop() {

    this.stopFlag = true

    }

    /**

    * 注册事件消费者

    * UUID(receiver) 生成一个理论上不重复的128位16进制表示的数字

    * @param receiver

    */

    registerReceiver(receiver, copmName) {

    console.log(receiver, copmName)

    //

    if (!receiver || !UUID(receiver)) {

    throw new Error(Illegal arguments)

    }

    if (this.componentsBinded.has(copmName)) {

    console.error(`key <${copmName}> 组件已经存在! `)

    }

    this.componentsBinded.set(copmName, receiver)

    }

    unRegisterReceiver(copmName) {

    copmName && this.componentsBinded.delete(copmName)

    }

    // 发送消息

    send(message) {

    if (this.stopFlag) return

    this.socket.send(message)

    }

    processMessage(data) {

    // console.log(`接收到 WS 消息... ${JSON.stringify(data)}`)

    if (data.msgType === 1) {

    this.online(data)

    }

    }

    online(data) {

    const comp = this.componentsBinded.get(ONLINE)

    if (comp && !comp.compDestory && comp.onSocketMessage) {

    comp.onSocketMessage(data)

    }

    }

    beatHeart() {

    if (!this.stopFlag) this.send(this.heartBeatData)

    }

    /**

    * 设置心跳时间间隔 同时开启心跳

    * @param onConnectionBroken 处理 无回应的次数超过限制的 函数

    */

    startPing() {

    if (this.heartHeatTimer > -1) {

    clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)

    }

    if (this.pingInterval === -1 || !this.connected) {

    return

    }

    this.heartHeatTimer = setInterval(this.beatHeart.bind(this), this.pingInterval)

    // console.log(开始发送心跳 , 定时器id -->> + this.heartHeatTimer)

    }

    setUpEvents() {

    Object.assign(this.socket, this.defaultEventProcessor)

    }

    // 连接发生错误的回调方法

    onError(error) {

    console.log(WebSocket连接发生错误)

    console.log(error)

    };

    // 连接成功建立的回调方法

    onOpen() {

    this.stopFlag = false

    console.log(WebSocket连接成功 )

    this.startPing()

    }

    // 接收到消息的回调方法

    onMessage(event) {

    // console.log(`接收到socket消息 --> ${event.data} `)

    this.processMessage(JSON.parse(event.data))

    }

    /**

    * 连接关闭的回调方法

    * 需要在关闭的时候就重连.

    */

    onClose() {

    this.connected = false

    if (this.stopFlag) {

    return

    }

    console.log(WebSocket连接关闭)

    // 停止心跳

    clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)

    // 开始重连

    this.socket = null

    this.connBrokenTimes = this.connBrokenTimes + 1

    setTimeout(function () {

    console.log(socket断开链接 一秒钟之后,准备重试链接 ,已经尝试重连的次数: -->> + this.connBrokenTimes + , 最大次数: -> + this.maxConnBrokenTimes)

    if (this.connBrokenTimes > this.maxConnBrokenTimes) {

    console.log(`多次链接失败,可能是服务器出现故障`)

    return

    }

    if (this.connect()) {

    this.startPing()

    }

    }.bind(this), 1000)// 重连间隔时间是 一秒

    }

    }

    后端处理用户发起的socket连接

    1. 监听onOpen事件,根据token获取当前登录用户详情
    2. 记录发起连接的是哪个用户,放入缓存
    # 这部分根据具体业务自由发挥

    @Slf4j

    @Component

    @ServerEndpoint("/online.ws")

    public class OnlineWSServer {

    private SysUserService userService;

    private SaleDeptService deptService;

    private DeptUserRelService deptUserRelService;

    private static final String ONLINE_TOKEN_ID = "onlineTokenId";

    /**

    * 用来记录当前在线连接数。

    */

    private static AtomicInteger onlineCount = new AtomicInteger(0);

    private static NutMap onlineUserPool = new NutMap();

    @OnOpen

    public synchronized void onOpen(Session session) {

    this.userService = WsApplicationContextAware.getApplicationContext().getBean(SysUserService.class);

    this.deptService = WsApplicationContextAware.getApplicationContext().getBean(SaleDeptService.class);

    this.deptUserRelService = WsApplicationContextAware.getApplicationContext().getBean(DeptUserRelService.class);

    addOnlineCount();

    addOnlineUser(session);

    }

    @OnClose

    public synchronized void onClose(Session session) {

    log.error(JSONUtil.toJsonStr(session));

    String userId = getSessionToken(session);

    SysUserEntity user = userService.getById(userId);

    SysUserEntity onlineUser = onlineUserPool.getAs(userId, SysUserEntity.class);

    if (onlineUser != null) {

    AtomicInteger onlineCount = onlineUser.getOnlineCount();

    onlineCount.getAndDecrement();

    if (onlineCount.get() == 0) {

    onlineUserPool.remove(userId);

    WebSocketSessionManager.remove(userId, session);

    }

    }

    reduceOnlineCount();

    }

    @OnMessage

    public void onMessage(String message, Session session) {

    // log.info("接收到ws=[{}]的消息:{}", session, message);

    JSONObject jsonObject = JSONObject.parseObject(message);

    int msgType = jsonObject.getIntValue("msgType");

    if (msgType == 98) {

    NutMap pong = new NutMap();

    pong.put("time", new Date());

    pong.put("onlineCount", onlineCount.intValue());

    pong.put("msgType", msgType);

    try {

    // 发送实时人数到当前登录用户

    Set<Map.Entry<String, Object>> entries = onlineUserPool.entrySet();

    List<SysUserEntity> sysUserEntityList = new ArrayList<>();

    for (Map.Entry<String, Object> entry : entries) {

    SysUserEntity user = (SysUserEntity) entry.getValue();

    sysUserEntityList.add(user);

    }

    pong.put("userList", sysUserEntityList);

    pong.put("onlineUserPool", onlineUserPool);

    pong.put("msgType", 1);

    session.getBasicRemote().sendText(JSON.toJSONString(pong));

    } catch (Exception e) {

    log.error(e.getMessage(), e);

    }

    }

    }

    @OnError

    public void onError(Session session, Throwable error) {

    log.error("ws= < " + session.getId() + " > 内部错误", error);

    }

    private String getSessionToken(Session session) {

    List<String> id = session.getRequestParameterMap().get(ONLINE_TOKEN_ID);

    return id.stream().findFirst().orElse(StrUtil.EMPTY);

    }

    private String getDeptFullName(String itcode){

    List<SaleDeptEntity> deptList = deptUserRelService.getDeptListByUsernameNotExpired(itcode);

    if (CollectionUtil.isNotEmpty(deptList)){

    List<String> deptNames = deptList.stream().map(SaleDeptEntity::getDeptFullName).filter(Objects::nonNull).collect(Collectors.toList());

    return Joiner.on(",").join(deptNames);

    }else{

    return "";

    }

    }

    public void addOnlineUser(Session session) {

    String userId = getSessionToken(session);

    SysUserEntity user = userService.getById(userId);

    if(user==null){

    return;

    }

    String deptFullName = getDeptFullName(user.getUsername());

    if (BeanUtil.isNotEmpty(onlineUserPool.get(userId))) {

    SysUserEntity userEntity = (SysUserEntity) onlineUserPool.get(userId);

    userEntity.setDeptFullNames(deptFullName);

    userEntity.getOnlineCount().incrementAndGet();

    onlineUserPool.put(userId, userEntity);

    } else {

    user.setDeptFullNames(deptFullName);

    user.getOnlineCount().incrementAndGet();

    onlineUserPool.put(userId, user);

    }

    }

    /**

    * 原子性操作,在线连接数加一

    */

    public void addOnlineCount() {

    onlineCount.getAndIncrement();

    }

    /**

    * 原子性操作,在线连接数减一

    */

    public static void reduceOnlineCount() {

    onlineCount.getAndDecrement();

    }

    }

    演示连接效果

    接收到msgType=98的心跳请求,反馈msgType=1的在线用户信息

    pringboot+webocket+vue网站实时在线人数(02) 实现步骤  第3张


    下面被划掉的地方其实是不存在的,我测试时多发送了一次

    pringboot+webocket+vue网站实时在线人数(02) 实现步骤  第4张


    目前已经能实时地拿到用户信息,但是这些数据目前是没有被任何页面处理的,我们的需求是当管理员打开实时监控页面时,才去处理并展示这些数据,接下来要做的就是,在打开实时监控页面时,把该页面的实例添加到socket容器中,也就是注册,在页面关闭时将该实例在容器中移除.


    前端注册socket监听

    <template>

    <div>

    <el-table

    :data="tableData"

    border

    style="width: 100%">

    <el-table-column

    prop=""

    align="center"

    label="当前登录用户"

    width="auto">

    <el-table-column

    align="center"

    type="index"

    width="auto">

    </el-table-column>

    <el-table-column

    fixed

    align="center"

    prop="username"

    label="ITCODE"

    width="auto">

    </el-table-column>

    <el-table-column

    fixed

    align="center"

    prop="name"

    label="姓名"

    width="auto">

    </el-table-column>

    <el-table-column

    fixed

    align="center"

    prop="deptFullNames"

    label="部门"

    width="auto">

    </el-table-column>

    <el-table-column

    fixed

    align="center"

    prop="onlineCount"

    label="会话个数"

    width="auto">

    </el-table-column>

    </el-table-column>

    </el-table>

    </div>

    </template>

    <script>

    const COMP_NAME = ONLINE

    export default {

    name: index,

    created() {

    this.registerWebSocket(true)

    },

    destroyed() {

    this.registerWebSocket(false)

    },

    data() {

    return {

    tableData: []

    }

    },

    methods: {

    registerWebSocket(open) {

    if (open) {

    if (window[SocketNotice] && window[SocketNotice].registerReceiver) {

    window[SocketNotice].registerReceiver(this, COMP_NAME)

    }

    } else {

    if (window[SocketNotice] && window[SocketNotice].unRegisterReceiver) {

    window[SocketNotice].unRegisterReceiver(COMP_NAME)

    }

    }

    },

    onSocketMessage(data) {

    this.tableData = data.userList;

    }

    }

    }

    </script>

    <style scoped>

    </style>

    最终效果

    pringboot+webocket+vue网站实时在线人数(02) 实现步骤  第5张

    总结

    总的流程大概就是以下几点

    1. 客户端在用户登录后发起websocket连接
    2. 后端监听到有连接加入时就对用户信息进行记录,做相应的处理并进行反馈
    1. 客户端对指定页面和指定的消息类型进行监听和处理

    The End