翼度科技»论坛 编程开发 .net 查看内容

基于WebSocket的实时消息传递设计

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
目录

概述

web管理系统中可以对业务数据执行新增和删除,现在需要当业务数据发生新增或删除操作后,尽可能实时的反应到WPF客户端上面。
web管理系统用VUE编写,后端服务为SpringBoot,WPF客户端基于.Netframework4.8编写。
整体架构

sequenceDiagram    title: 交互时序图            web前台->>+web后端服务:新增数据        Note over web前台,web后端服务:caremaId,labelInfo,......            web后端服务->>+WebSocketServer:创建websocker消息        Note over web后端服务,WebSocketServer:Must:cameraId=clientId            WPF客户端1-->>+WebSocketServer:创建监听    Note over WPF客户端1,WebSocketServer:clientId         WPF客户端2-->>+WebSocketServer:创建监听    Note over WPF客户端2,WebSocketServer:clientId    WebSocketServer->>WPF客户端1:分发websocker消息    Note over WebSocketServer,WPF客户端1:依据:cameraId=clientId            WebSocketServer->>WPF客户端2:分发websocker消息    Note over WebSocketServer,WPF客户端2:依据:cameraId=clientId设计

流程设计


  • 用户在浏览器界面执行新增业务数据操作,调用后端新增接口
  • WPF客户端在启动的时候初始化websocket客户端,并创建对server的监听
  • 后端新增接口先将数据落库,而后调用websocket服务端产生消息,消息在产生后立马被发送到了正在监听中的websocket-client
  • websocket-server和websocket-client是一对多的关系,如何保证业务数据被正确的分发?监听的时候给server端传递一个全局唯一的clientId,业务数据在产生的时候关联到一个BizId上面,只要保证clientId=BizId就可以了。
  • 删除流程和新增类似
程序设计

WebSocketServer

概述

WebSocketServer端采用SpringBoot框架实现,通过在springboot-web项目中集成 org.springframework.boot:spring-boot-starter-websocket
实现websocket的能力。
新增pom
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>
复制代码
新增配置类
  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. @Configuration
  6. @EnableWebSocket
  7. public class WebSocketConfig {
  8.     @Bean
  9.     public ServerEndpointExporter serverEndpointExporter() {
  10.         return new ServerEndpointExporter();
  11.     }
  12. }
复制代码
创建websocket端点
  1. import com.alibaba.fastjson.JSON;
  2. import org.springframework.stereotype.Component;
  3. import javax.websocket.*;
  4. import javax.websocket.server.PathParam;
  5. import javax.websocket.server.ServerEndpoint;
  6. import java.io.IOException;
  7. import java.util.concurrent.ConcurrentHashMap;
  8. @ServerEndpoint("/ws/label/{clientId}")
  9. @Component
  10. public class LabelWebSocket {
  11.     /**
  12.      * session list
  13.      */
  14.     private static ConcurrentHashMap<String, Session> sessionList = new ConcurrentHashMap<>();
  15.     /**
  16.      * 当前 clientId
  17.      */
  18.     private String currentClientId = "";
  19.     @OnOpen
  20.     public void open(Session session, @PathParam("clientId") String clientId) throws IOException {
  21.         if (sessionList.containsKey(clientId)) {
  22.             sessionList.remove(clientId);
  23.         }
  24.         sessionList.put(clientId, session);
  25.         currentClientId = clientId;
  26.         this.sendMsg(session, "connectok");
  27.     }
  28.     @OnClose
  29.     public void close(Session session) throws IOException {
  30.         sessionList.remove(currentClientId);
  31.         System.out.println("连接关闭,session=" + JSON.toJSONString(session.getId()));
  32.     }
  33.     @OnMessage
  34.     public void receiveMsg(Session session, String msg) throws IOException {
  35.         this.sendMsg(session, "接收到的消息为:" + msg);
  36. //        throw new RuntimeException("主动抛异常");
  37.     }
  38.     @OnError
  39.     public void error(Session session, Throwable e) throws IOException {
  40.         System.out.println("连接异常,session=" + JSON.toJSONString(session.getId()) + ";currentClientId=" + currentClientId);
  41.         this.sendMsg(session, "发生异常,e=" + e.getMessage());
  42.         e.printStackTrace();
  43.     }
  44.     /**
  45.      * @param clientId
  46.      * @param msg
  47.      */
  48.     public boolean sendMsg(String clientId, String msg) throws IOException {
  49.         if (sessionList.containsKey(clientId)) {
  50.             Session session = sessionList.get(clientId);
  51.             this.sendMsg(session, msg);
  52.             return true;
  53.         } else {
  54.             return false;
  55.         }
  56.     }
  57.     private void sendMsg(Session session, String msg) throws IOException {
  58.         session.getBasicRemote().sendText(msg);
  59.     }
  60. }
复制代码
WebSocketClient

概述

WebSocketClient端集成在WPF应用客户端中,通过前期调研,选中 WebSocketSharp  作为websocketclient工具,WebSocketSharp 是托管在Github的开源项目,MITLicense,目前4.9K的star。
安装WebSocketSharp
  1. //nuget
  2. Install-Package WebSocketSharp -Pre
复制代码
初始化client
  1. WebSocket ws = new WebSocket("ws://127.0.0.1:8083/ws/xx/clientId");
复制代码
创建连接
  1. private void InitWebSocket()
  2. {
  3.     ws.OnOpen += (sender, e) =>
  4.     {
  5.         Console.WriteLine("onOpen");
  6.     };
  7.     //允许ping
  8.     ws.EmitOnPing = true;
  9.     //接收到xiaoxi
  10.     ws.OnMessage += (sender, e) =>
  11.     {
  12.         ReceiveMessage(sender, e);
  13.     };
  14.     ws.Connect();
  15.     //发送消息
  16.     //ws.Send("BALUS")
  17.     ;
  18. }
  19. private void ReceiveMessage(object sender, MessageEventArgs e)
  20. {
  21.     if (e.IsText)
  22.     {
  23.         // Do something with e.Data.like jsonstring
  24.         Console.WriteLine(e.Data);
  25.         return;
  26.     }
  27.     if (e.IsBinary)
  28.     {
  29.         // Do something with e.RawData. like  byte[]
  30.         return;
  31.     }
  32.     if (e.IsPing)
  33.     {
  34.         // Do something to notify that a ping has been received.
  35.         return;
  36.     }
  37. }
复制代码
跨线程更新UI

由于 WebSocketSharp 会创建线程来处理 ReceiveMessage ,而WPF中子线程是无法更新UI的,所以需要引入 Dispatcher 来实现跨线程更新UI。
获取当前线程名字
  1.  //当前线程
  2. string name = Thread.CurrentThread.ManagedThreadId.ToString();
复制代码
示例代码
  1. private void ReceiveMessage(object sender, MessageEventArgs e)
  2. {
  3.     if (e.IsText)
  4.     {
  5.         // Do something with e.Data.like jsonstring
  6.         Console.WriteLine(e.Data);
  7.         //当前线程
  8.         string name = Thread.CurrentThread.ManagedThreadId.ToString();
  9.         App.Current.Dispatcher.Invoke((Action)(() =>
  10.         {
  11.             Image lab = new Image();
  12.             lab.Uid = "123456";
  13.             lab.Name = "labName";
  14.             lab.Width = 50; lab.Height = 50;
  15.             string url = "http://xxx:xxx/img/louyu.png";
  16.             BitmapImage bitmapImage = HttpUtil.getImage(url);
  17.             lab.Source = bitmapImage;
  18.             lab.AddHandler(MouseLeftButtonDownEvent, new MouseButtonEventHandler(LabelClick));
  19.             Canvas.SetTop(lab, 800);
  20.             Canvas.SetLeft(lab, 600);
  21.             this.cav.Children.Add(lab);
  22.         }));
  23.         return;
  24.     }
  25. }
复制代码
接口设计

新增接口

概述

目前WebSocketServer和web后端服务是在同一个SpringBoot的工程中,所以只要将WebSocketServer托管到SpringContainer中,web后端服务可以通过 DI 的方式直接访问 WebSocketEndPoint。
如果考虑程序的低耦合,可以在WebSocketServer和web后端服务之间架设一个MQ。
核心代码
  1.     @Autowired
  2.     private LabelWebSocket ws;
  3.     @GetMapping("/create")
  4.     public boolean createLabel() throws IOException {
  5.         String cameraId = "cml";
  6.         //todo
  7.         boolean result = ws.sendMsg(cameraId, "新增标签");
  8.         return result;
  9.     }
复制代码
风险

分布式风险

当前在 WebSocketServer 中,已经连接的client信息是记录在当前进程的cache中,如果服务做横向扩容,cache信息无法在多实例进程中传递,将导致无法正确的处理业务数据,并可能会发生意想不到的异常和bug,此问题在并发越高的情况下造成的影响越大
资源风险

web后端服务为基于java语言的springboot程序,这种类型程序的特点是内存消耗特别严重。WebSocketServer服务在本项目中仅用作消息中间件,连通web后端服务和WPF客户端。
首先WebSocketServer没有太多的计算能力的消耗,内存消耗会随着连接客户端数量的增长而增长,网络将是最大的开销,一方面需要转发来自web后端服务的业务数据,并和WPF客户端保持长连接;另一方面WebSocketServer和WPF客户端的交互可能会走公网,而其和web后端服务必然是在局域网环境。
综上,将web后端服务和WebSocketServer分开部署对于硬件资源成本和利用率来说是最好的选择。
高可用风险

未引入重试机制,当某一个环节失败之后,将导致异常情况发生。

来源:https://www.cnblogs.com/Naylor/archive/2023/01/19/17061788.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具