From 766607c3f1af4025f1491008be9c6e3a09301285 Mon Sep 17 00:00:00 2001 From: xd <844539747@qq.com> Date: Mon, 29 Apr 2024 15:10:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../system/SysNoticeController.java | 70 +------- .../web/utils/SendNotice/NoticeUtil.java | 113 +++++++++++++ .../ruoyi/common/constant/RedisConstant.java | 6 + ruoyi-framework/pom.xml | 13 +- .../RedisMessageListenerConfig.java | 50 ++++++ .../redisWebsocket/RedisReceiver.java | 35 ++++ .../websocket/redisWebsocket/SendMsg.java | 20 +++ .../websocket/redisWebsocket/SendMsgAll.java | 46 ++++++ .../redisWebsocket/WebSocketBean.java | 37 +++++ .../redisWebsocket/WebSocketServerImpl.java | 35 ++++ .../redisWebsocket/WebsocketEndpoint.java | 25 +++ .../redisWebsocket/WebsocketEndpointImpl.java | 154 ++++++++++++++++++ .../redisWebsocket/WebsocketService.java | 24 +++ .../src/layout/components/NavbarNotice.vue | 6 +- 14 files changed, 568 insertions(+), 66 deletions(-) create mode 100644 ruoyi-admin/src/main/java/com/ruoyi/web/utils/SendNotice/NoticeUtil.java create mode 100644 ruoyi-common/src/main/java/com/ruoyi/common/constant/RedisConstant.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisMessageListenerConfig.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisReceiver.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsg.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsgAll.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketBean.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketServerImpl.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpoint.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpointImpl.java create mode 100644 ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketService.java diff --git a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java index 5c5edcf..21856c2 100644 --- a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java +++ b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java @@ -2,17 +2,22 @@ package com.ruoyi.web.controller.system; import java.io.IOException; import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.stream.Collectors; import com.alibaba.fastjson2.JSONObject; +import com.ruoyi.common.constant.CacheConstants; +import com.ruoyi.common.constant.RedisConstant; import com.ruoyi.common.constant.WebsocketConst; import com.ruoyi.common.core.domain.model.LoginUser; +import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.utils.DateUtils; import com.ruoyi.framework.websocket.WebSocket; import com.ruoyi.system.domain.NoticeUserSelect; import com.ruoyi.system.domain.SysNoticeUser; +import com.ruoyi.system.domain.SysUserOnline; +import com.ruoyi.web.utils.SendNotice.NoticeUtil; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.DeleteMapping; @@ -31,6 +36,7 @@ import com.ruoyi.common.enums.BusinessType; import com.ruoyi.system.domain.SysNotice; import com.ruoyi.system.service.ISysNoticeService; +import javax.annotation.PostConstruct; import javax.websocket.Session; /** @@ -80,7 +86,7 @@ public class SysNoticeController extends BaseController noticeService.insertNotice(notice); //推送消息插入中间表 LoginUser loginUser = getLoginUser();//当前登陆者 - insertNoticeUser(loginUser,notice,noticeService,null); + NoticeUtil.sendNotices(loginUser,notice,noticeService,null); }catch (Exception e){ return error("系统异常"); } @@ -178,66 +184,6 @@ public class SysNoticeController extends BaseController return success(noticeService.selectNoticeById(noticeId)); } - /** - * 推送消息插入中间表 - * @param loginUser 发送对象排除自身 - * @param notice 发送信息对象 - * @param noticeService 接口 - */ - public static void insertNoticeUser(LoginUser loginUser,SysNotice notice,ISysNoticeService noticeService,List userNames) throws IOException { - List sysNoticeUsers = new ArrayList(); - - JSONObject obj = new JSONObject(); - obj.put(WebsocketConst.MSG_ID, notice.getNoticeId()); - obj.put(WebsocketConst.MSG_TITLE, notice.getNoticeTitle()); - obj.put(WebsocketConst.MSG_CONTENT, notice.getNoticeContent()); - - if(!"3".equals(notice.getNoticeType())){//系统通知 或 通知公告 - WebSocket.sendInfo(obj.toString(),null); - - SysNoticeUser sysNoticeUser = null; - CopyOnWriteArraySet webSocketSet = WebSocket.getWebSocketSet();//获取在线用户 - Date sendTime = DateUtils.getNowDate(); - for (WebSocket item : webSocketSet) { - String username = item.sid; - if(!username.equals(loginUser.getUsername())){ - sysNoticeUser = new SysNoticeUser(); - sysNoticeUser.setNoticeId(notice.getNoticeId()); - sysNoticeUser.setUserId(username); - sysNoticeUser.setSendUser(loginUser.getUser().getNickName()); - sysNoticeUser.setSendTime(sendTime); - sysNoticeUsers.add(sysNoticeUser); - } - } - if(!sysNoticeUsers.isEmpty()){ - noticeService.insertNoticeUserBatch(sysNoticeUsers); - } - }else{ - if(userNames!=null&&userNames.size()>0){ - for(String names:userNames){ - WebSocket.sendInfo(obj.toString(),names); - } - SysNoticeUser sysNoticeUser = null; - CopyOnWriteArraySet webSocketSet = WebSocket.getWebSocketSet();//获取在线用户 - Date sendTime = DateUtils.getNowDate(); - for (WebSocket item : webSocketSet) { - String username = item.sid; - if(userNames.contains(username)){ - sysNoticeUser = new SysNoticeUser(); - sysNoticeUser.setNoticeId(notice.getNoticeId()); - sysNoticeUser.setUserId(username); - sysNoticeUser.setSendUser(loginUser.getUser().getNickName()); - sysNoticeUser.setSendTime(sendTime); - sysNoticeUsers.add(sysNoticeUser); - } - } - if(!sysNoticeUsers.isEmpty()){ - noticeService.insertNoticeUserBatch(sysNoticeUsers); - } - } - } - } - /** * 获取对应消息对象的 人员 * @param noticeUserSelect diff --git a/ruoyi-admin/src/main/java/com/ruoyi/web/utils/SendNotice/NoticeUtil.java b/ruoyi-admin/src/main/java/com/ruoyi/web/utils/SendNotice/NoticeUtil.java new file mode 100644 index 0000000..e8cc2a5 --- /dev/null +++ b/ruoyi-admin/src/main/java/com/ruoyi/web/utils/SendNotice/NoticeUtil.java @@ -0,0 +1,113 @@ +package com.ruoyi.web.utils.SendNotice; + +import com.alibaba.fastjson2.JSONObject; +import com.ruoyi.common.constant.CacheConstants; +import com.ruoyi.common.constant.WebsocketConst; +import com.ruoyi.common.core.domain.model.LoginUser; +import com.ruoyi.common.core.redis.RedisCache; +import com.ruoyi.common.utils.DateUtils; +import com.ruoyi.common.utils.StringUtils; +import com.ruoyi.common.utils.spring.SpringUtils; +import com.ruoyi.framework.websocket.redisWebsocket.WebsocketService; +import com.ruoyi.system.domain.SysNotice; +import com.ruoyi.system.domain.SysNoticeUser; +import com.ruoyi.system.service.ISysNoticeService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.MessageSource; + +import javax.annotation.PostConstruct; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +public class NoticeUtil { + + @Autowired + private static RedisCache redis = SpringUtils.getBean(RedisCache.class); + + @Autowired + private static WebsocketService wbService = SpringUtils.getBean(WebsocketService.class); + + /** + * 推送消息插入中间表 + * @param loginUser 发送对象排除自身 + * @param notice 发送信息对象 + * @param noticeService 接口 + */ + public static void sendNotices(LoginUser loginUser, SysNotice notice, ISysNoticeService noticeService, List userIds) throws IOException { + JSONObject obj = new JSONObject(); + obj.put(WebsocketConst.MSG_ID, notice.getNoticeId()); + obj.put(WebsocketConst.MSG_TITLE, notice.getNoticeTitle()); + obj.put(WebsocketConst.MSG_CONTENT, notice.getNoticeContent()); + + if(!"3".equals(notice.getNoticeType())){//系统通知 或 通知公告 + + wbService.sendMessageAll("ruoyi",obj.toString()); + + insertNoticeUser(loginUser,notice,noticeService,null); + + }else{ + if(userIds!=null&&userIds.size()>0){ + for(String userId:userIds){ + wbService.sendMessageById("ruoyi",userId,obj.toString()); + } + + insertNoticeUser(loginUser,notice,noticeService,userIds); + + } + } + } + + public static void insertNoticeUser(LoginUser loginUser,SysNotice notice,ISysNoticeService noticeService, List userIds){ + List sysNoticeUsers = new ArrayList(); + //发送日期 + Date sendTime = DateUtils.getNowDate(); + //获取在线用户 + Collection keys = redis.keys(CacheConstants.LOGIN_TOKEN_KEY + "*"); + + if(userIds!=null&&userIds.size()>0){ + for (String key : keys) { + LoginUser user = redis.getCacheObject(key); + String userId = user.getUsername(); + if(userIds.contains(userId)){ + SysNoticeUser sysnoticeuser = noticeSet(loginUser,notice,userId,sendTime); + sysNoticeUsers.add(sysnoticeuser); + } + } + if(!sysNoticeUsers.isEmpty()){ + noticeService.insertNoticeUserBatch(sysNoticeUsers); + } + }else{ + for (String key : keys) { + LoginUser user = redis.getCacheObject(key); + String userId = user.getUsername(); + if(!userId.equals(loginUser.getUsername())){ + SysNoticeUser sysnoticeuser = noticeSet(loginUser,notice,userId,sendTime); + sysNoticeUsers.add(sysnoticeuser); + } + } + if(!sysNoticeUsers.isEmpty()){ + noticeService.insertNoticeUserBatch(sysNoticeUsers); + } + } + } + + /** + * 消息用户中间表设置 + * @param loginUser 当前登录者 + * @param notice 消息实体类 + * @param userId 在线用户账号 + * @param sendTime 发送时间 + * @return + */ + public static SysNoticeUser noticeSet(LoginUser loginUser,SysNotice notice,String userId,Date sendTime){ + SysNoticeUser sysNoticeUser = new SysNoticeUser(); + sysNoticeUser.setNoticeId(notice.getNoticeId()); + sysNoticeUser.setUserId(userId); + sysNoticeUser.setSendUser(loginUser.getUser().getNickName()); + sysNoticeUser.setSendTime(sendTime); + return sysNoticeUser; + } +} diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/constant/RedisConstant.java b/ruoyi-common/src/main/java/com/ruoyi/common/constant/RedisConstant.java new file mode 100644 index 0000000..338c504 --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/constant/RedisConstant.java @@ -0,0 +1,6 @@ +package com.ruoyi.common.constant; + +public class RedisConstant { + public static final String SYS_TOPIC = "sys_topic";//系统消息 + public static final String SYS_USER_TOPIC = "sys_user_topic";//用户消息 +} diff --git a/ruoyi-framework/pom.xml b/ruoyi-framework/pom.xml index 73b34fd..233276a 100644 --- a/ruoyi-framework/pom.xml +++ b/ruoyi-framework/pom.xml @@ -34,7 +34,11 @@ org.springframework.boot spring-boot-starter-websocket - + + org.java-websocket + java-websocket + 1.3.3 + com.alibaba @@ -64,7 +68,12 @@ com.ruoyi ruoyi-system + + org.projectlombok + lombok + 1.18.26 + - \ No newline at end of file + diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisMessageListenerConfig.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisMessageListenerConfig.java new file mode 100644 index 0000000..bc3eb6f --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisMessageListenerConfig.java @@ -0,0 +1,50 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +import com.ruoyi.common.constant.RedisConstant; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; + +/** + * @author + */ +@Configuration +public class RedisMessageListenerConfig { + @Autowired + private RedisReceiver redisReceiver; + + /** + * 监听redis中的订阅信息 + * @param redisConnectionFactory + * @return + */ + @Bean + public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) { + RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); + redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); + //添加redis消息队列监听,监听im-topic消息主题的消息,使用messageListenerAdapter()中设置的类和方法处理消息。 + redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), new PatternTopic(RedisConstant.SYS_USER_TOPIC)); + //同上一样 + redisMessageListenerContainer.addMessageListener(messageAllListenerAdapter(), new PatternTopic(RedisConstant.SYS_TOPIC)); + return redisMessageListenerContainer; + } + + /** + * 添加订阅消息处理类,通过反射获取处理类中的处理方法 + * 即使用RedisReceiver类中的sendMsg方法处理消息 + * @return + */ + @Bean + public MessageListenerAdapter messageListenerAdapter() { + return new MessageListenerAdapter(redisReceiver, "sendMsg"); + } + + @Bean + public MessageListenerAdapter messageAllListenerAdapter(){ + return new MessageListenerAdapter(redisReceiver, "sendAllMsg"); + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisReceiver.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisReceiver.java new file mode 100644 index 0000000..f9150c7 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/RedisReceiver.java @@ -0,0 +1,35 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +import com.alibaba.fastjson2.JSONObject; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 处理订阅redis的消息 + * @author + */ +@Component +public class RedisReceiver { + + @Resource + WebsocketEndpoint websocketEndpoint; + + /** + * 处理一对一消息 + * @param message 消息队列中的消息 + */ + public void sendMsg(String message) { + SendMsg msg = JSONObject.parseObject(message, SendMsg.class); + websocketEndpoint.sendMessageById(msg.getProjectId(),msg.getUserId(),msg.getMsg()); + } + + /** + * 处理广播消息 + * @param message + */ + public void sendAllMsg(String message){ + SendMsgAll msg = JSONObject.parseObject(message, SendMsgAll.class); + websocketEndpoint.batchSendMessage(msg.getProjectId(),msg.getMsg()); + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsg.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsg.java new file mode 100644 index 0000000..8c0ef78 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsg.java @@ -0,0 +1,20 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +/** + * 按用户推送 + * @author + */ +public class SendMsg extends SendMsgAll{ + /** + * 用户ID + */ + private String userId; + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsgAll.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsgAll.java new file mode 100644 index 0000000..fb4d75c --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/SendMsgAll.java @@ -0,0 +1,46 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +/** + * 推送全部 + */ +public class SendMsgAll { + /** + * websocket业务数据(json) + */ + private String msg; + + /** + * 业务模块类型 + */ + private String type; + + /** + * 项目ID + */ + private String projectId; + + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getProjectId() { + return projectId; + } + + public void setProjectId(String projectId) { + this.projectId = projectId; + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketBean.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketBean.java new file mode 100644 index 0000000..afa43f7 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketBean.java @@ -0,0 +1,37 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +import javax.websocket.Session; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author + */ +public class WebSocketBean { + /** + * 连接session对象 + */ + private Session session; + + /** + * 连接错误次数 + */ + private AtomicInteger erroerLinkCount = new AtomicInteger(0); + + public int getErroerLinkCount() { + // 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值 + return erroerLinkCount.getAndIncrement(); + } + + public void cleanErrorNum() { + // 清空计数 + erroerLinkCount = new AtomicInteger(0); + } + + public Session getSession() { + return session; + } + + public void setSession(Session session) { + this.session = session; + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketServerImpl.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketServerImpl.java new file mode 100644 index 0000000..e881a26 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebSocketServerImpl.java @@ -0,0 +1,35 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +import com.alibaba.fastjson2.JSON; +import com.ruoyi.common.constant.RedisConstant; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +/** + * @author + */ +@Service +public class WebSocketServerImpl implements WebsocketService { + + @Autowired + RedisTemplate redisTemplate; + + @Override + public void sendMessageAll(String projectId ,String message) { + SendMsgAll sendMsgAll = new SendMsgAll(); + sendMsgAll.setProjectId(projectId); + sendMsgAll.setMsg(message); + redisTemplate.convertAndSend(RedisConstant.SYS_TOPIC, JSON.toJSONString(sendMsgAll)); + + } + + @Override + public void sendMessageById(String projectId,String userId, String message) { + SendMsg sendMsg = new SendMsg(); + sendMsg.setProjectId(projectId); + sendMsg.setUserId(userId); + sendMsg.setMsg(message); + redisTemplate.convertAndSend(RedisConstant.SYS_USER_TOPIC,JSON.toJSONString(sendMsg)); + } +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpoint.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpoint.java new file mode 100644 index 0000000..4f1174a --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpoint.java @@ -0,0 +1,25 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +/** + * 给客户端发送消息 + * @author + */ +public interface WebsocketEndpoint { + + + /** + * 向所有在线用户群发消息 + * @param projectId 项目ID + * @param message 发送给客户端的消息 + */ + void batchSendMessage(String projectId,String message); + + /** + * 发送给对应的用户 + * @param userId 用户的ID + * @param projectId 项目ID + * @param message 发送的消息 + */ + void sendMessageById(String projectId,String userId, String message); + +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpointImpl.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpointImpl.java new file mode 100644 index 0000000..56315b0 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketEndpointImpl.java @@ -0,0 +1,154 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author + */ +@Service +@ServerEndpoint(value = "/websocket/message/{projectId}/{userId}") +@Component +public class WebsocketEndpointImpl implements WebsocketEndpoint { + + private static Logger log = LoggerFactory.getLogger(WebsocketEndpointImpl.class); + /** + * 错误最大重试次数 + */ + private static final int MAX_ERROR_NUM = 3; + + /** + * 用来存放每个客户端对应的webSocket对象。 + */ + private static Map> webSocketInfo; + + static { + // concurrent包的线程安全map + webSocketInfo = new ConcurrentHashMap>(); + } + + + @OnOpen + public void onOpen(Session session, EndpointConfig config, @PathParam("userId") String userId,@PathParam("projectId") String projectId) { + WebSocketBean bean = new WebSocketBean(); + bean.setSession(session); + Map concurrentHashMap = new ConcurrentHashMap(); + concurrentHashMap.put(userId,bean); + webSocketInfo.put(projectId, concurrentHashMap); + log.info("ws项目:"+projectId+",客户端连接服务器userId :" + userId + "当前连接数:" + countUser(projectId)); + } + + @OnClose + public void onClose(Session session, @PathParam("userId") String userId,@PathParam("projectId") String projectId) { + // 客户端断开连接移除websocket对象 + Map concurrentHashMap = webSocketInfo.get(projectId); + if(concurrentHashMap != null){concurrentHashMap.remove(userId);} + log.info("ws项目:"+projectId+",客户端断开连接,当前连接数:" + countUser(projectId)); + + } + + @OnMessage + public void onMessage(Session session, String message, @PathParam("userId") String userId,@PathParam("projectId") String projectId) { + log.info("ws项目:"+projectId+",客户端 userId: " + userId + ",消息:" + message); + } + + @OnError + public void onError(Session session, Throwable throwable) { +// log.error("ws发生错误" + throwable.getMessage(), throwable); + } + + public void sendMessage(Session session, String message, String projectId, String userId) { + + log.info("ws项目:"+projectId+",连接数:"+countUser(projectId)+",发送消息 " + session); + try { + // 发送消息 + synchronized (session) { + if (session.isOpen()) { + session.getBasicRemote().sendText(message); + } + } + // 清空错误计数 + this.cleanErrorNum(projectId, userId); + } catch (Exception e) { + log.error("ws项目:"+projectId+",用户:"+userId+",发送消息失败" + e.getMessage(), e); + int errorNum = this.getErroerLinkCount(projectId, userId); + + // 小于最大重试次数重发 + if (errorNum <= MAX_ERROR_NUM) { + sendMessage(session, message, projectId, userId); + } else { + log.error("ws发送消息失败超过最大次数"); + // 清空错误计数 + this.cleanErrorNum(projectId, userId); + } + } + } + + @Override + public void batchSendMessage(String projectId,String message) { + Map concurrentHashMap = webSocketInfo.get(projectId); + if(concurrentHashMap != null){ + Set> set = concurrentHashMap.entrySet(); + for(Map.Entry map: set ){ + sendMessage(map.getValue().getSession(), message,projectId, map.getKey()); + } + } + } + + @Override + public void sendMessageById(String projectId,String userId, String message) { + Map concurrentHashMap = webSocketInfo.get(projectId); + if(concurrentHashMap != null){ + WebSocketBean webSocketBean = concurrentHashMap.get(userId); + if (webSocketBean != null) { + sendMessage(webSocketBean.getSession(), message, projectId,userId); + } + } + } + + /** + * 清空错误计数 + */ + private void cleanErrorNum(String projectId, String userId){ + Map concurrentHashMap = webSocketInfo.get(projectId); + if(concurrentHashMap != null){ + WebSocketBean webSocketBean = concurrentHashMap.get(userId); + if (webSocketBean != null) { + webSocketBean.cleanErrorNum(); + } + } + } + + /** + * 获取错误计数 + */ + private int getErroerLinkCount(String projectId, String userId){ + int errorNum = 0; + Map concurrentHashMap = webSocketInfo.get(projectId); + if(concurrentHashMap != null){ + WebSocketBean webSocketBean = concurrentHashMap.get(userId); + if (webSocketBean != null) { + errorNum = webSocketBean.getErroerLinkCount(); + } + } + return errorNum; + } + + private Integer countUser (String projectId){ + int size = 0; + Map concurrentHashMap = webSocketInfo.get(projectId); + if(concurrentHashMap != null) { + size = concurrentHashMap.size(); + } + return size; + } + +} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketService.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketService.java new file mode 100644 index 0000000..0a14aea --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/redisWebsocket/WebsocketService.java @@ -0,0 +1,24 @@ +package com.ruoyi.framework.websocket.redisWebsocket; + +/** + * 往Redis中存入消息 + * @author + */ +public interface WebsocketService { + + + + /** + * 向所有在线用户群发消息 + * @param message 发送给客户端的消息 + */ + void sendMessageAll(String projectId,String message); + + /** + * 发送给对应的用户 + * @param userId 用户的ID + * @param message 发送的消息 + */ + void sendMessageById(String projectId,String userId, String message); + +} diff --git a/ruoyi-ui/src/layout/components/NavbarNotice.vue b/ruoyi-ui/src/layout/components/NavbarNotice.vue index 0e8da9f..5f6928d 100644 --- a/ruoyi-ui/src/layout/components/NavbarNotice.vue +++ b/ruoyi-ui/src/layout/components/NavbarNotice.vue @@ -233,12 +233,14 @@ const webSocketApiUrl = 'ws://' + host + '/' + websocket_pattern; // 当前WebSocket的请求地址前缀, // /websocket/template-push/ 就是我后端配置的websocket端点地址 - let url = webSocketApiUrl + '/websocket/message/'+userName; + let url = webSocketApiUrl + '/websocket/message/ruoyi/'+userName; this.websock = new WebSocket(url) this.websock.onopen = this.websocketOnopen this.websock.onerror = this.websocketOnerror this.websock.onmessage = this.websocketOnmessage this.websock.onclose = this.websocketOnclose + + console.log('WebSocket 连接地址:'+host) }, websocketOnopen: function() { console.log('WebSocket连接成功') @@ -259,7 +261,7 @@ this.$notify({ title: '消息', type: 'warning', - duration: 2000, + duration: 5000, dangerouslyUseHTMLString: true, message: JSON.parse(e.data).noticeTitle });