翼度科技»论坛 编程开发 .net 查看内容

一个简易socket通信结构

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
服务端

工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。
以下是服务端代码
  1.   1 public class MsgServerSchedule
  2.   2 {
  3.   3
  4.   4
  5.   5     Socket serverSocket;
  6.   6     public Action<List<string>> onClientAddDel;
  7.   7     public Action<Telegram_Base> onReceive;
  8.   8     bool _isRunning = false;
  9.   9
  10. 10     
  11. 11     int port;
  12. 12
  13. 13     public TelgramType telType;
  14. 14
  15. 15     static List<EndpointClient> clients;
  16. 16
  17. 17     public bool isRunning { get { return _isRunning; } }
  18. 18     public MsgServerSchedule(int _port)
  19. 19     {
  20. 20         //any 就决定了 ip地址格式是v4
  21. 21         //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654);
  22. 22         //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  23. 23
  24. 24         this.port = _port;
  25. 25
  26. 26         clients = new List<EndpointClient>();
  27. 27
  28. 28         Console.WriteLine("constructor");
  29. 29
  30. 30     }
  31. 31
  32. 32     public void Start()
  33. 33     {
  34. 34         try
  35. 35         {
  36. 36             IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
  37. 37             serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  38. 38             serverSocket.Bind(endPoint);
  39. 39             serverSocket.Listen(port);
  40. 40
  41. 41             serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket);
  42. 42
  43. 43             _isRunning = true;
  44. 44             Console.WriteLine("start");
  45. 45         }
  46. 46         catch (Exception ex)
  47. 47         {
  48. 48             _isRunning = false;
  49. 49             serverSocket = null;
  50. 50
  51. 51             Console.WriteLine("服务启动出现错误,可能端口已被占用:"+port);
  52. 52             Console.WriteLine(ex.Message);
  53. 53         }
  54. 54        
  55. 55     }
  56. 56
  57. 57     public void Stop()
  58. 58     {
  59. 59         for (int i = 0; i < clients.Count; i++)
  60. 60         {
  61. 61             clients[i].Close();               
  62. 62         }
  63. 63         ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll);
  64. 64         serverSocket.Close();
  65. 65         _isRunning = false;
  66. 66     }
  67. 67
  68. 68     public void SendToAll(Telegram_Base tel)
  69. 69     {
  70. 70         for (int i = 0; i < clients.Count; i++)
  71. 71         {
  72. 72             clients[i].Send(tel);
  73. 73         }
  74. 74     }
  75. 75
  76. 76     public void SendToSomeOne(Telegram_Base tel)
  77. 77     {
  78. 78         for (int i = 0; i < clients.Count; i++)
  79. 79         {
  80. 80             if(clients[i].remoteIPPort==tel.remoteIPPort)
  81. 81             {
  82. 82                 clients[i].Send(tel);
  83. 83                 break;
  84. 84             }
  85. 85         }
  86. 86     }
  87. 87
  88. 88     //新增与删除客户端 秉持原子操作
  89. 89     List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType)
  90. 90     {
  91. 91         //异步同时有新客户端上线 与下线 不进行资源互斥访问会报错
  92. 92         lock (this)
  93. 93         {
  94. 94             if (changeType == EndPointClientsChangeType.Add)
  95. 95             {
  96. 96                 clients.Add(cli);
  97. 97             }
  98. 98             else if(changeType== EndPointClientsChangeType.Del)
  99. 99             {
  100. 100                 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort);
  101. 101                 if (beRemoveClient != null)
  102. 102                     clients.Remove(beRemoveClient);
  103. 103             }
  104. 104             else if(changeType== EndPointClientsChangeType.ClearAll)
  105. 105             {
  106. 106                 clients.Clear();
  107. 107             }
  108. 108             else if (changeType == EndPointClientsChangeType.GetAll)
  109. 109             {
  110. 110                 List<string> onLines = new List<string>(clients.Count);
  111. 111                 for (int i = 0; i < clients.Count; i++)
  112. 112                 {
  113. 113                     onLines.Add(clients[i].remoteIPPort);
  114. 114                 }
  115. 115                 return onLines;
  116. 116             }
  117. 117             else
  118. 118             {
  119. 119                 return null;
  120. 120             }
  121. 121         }
  122. 122         return null;
  123. 123     }
  124. 124     //异步监听客户端 有客户端到来时的回调
  125. 125     private void AcceptCallback(IAsyncResult iar)
  126. 126     {
  127. 127         //server端一直在receive 能够感知到客户端掉线 (连上后 关闭客户端 server立即有错误爆出)
  128. 128         //但是同情况 关闭server端 客户端无错误爆出 直到点发送 才有错误爆出
  129. 129         //由此得出 处于receive才会有掉线感知  ,send时发现发不出去自然也会有感知 跟人的正常思维理解是一样的
  130. 130         //虽然tcp是所谓的长连接 ,通过反复测试  ->但是双方相互都处在一个静止状态 是无法 确定在不在的  
  131. 131         //连上后平常的情况下 并没有数据流通 的 ,双方只是一个状态的保持而已。
  132. 132         //这也是为什么 好多服务 客户端 程序 都有个心跳机制(由此我们可以想到继续完善 弄个客户端列表 心跳超时的剔除列表 正常发消息send 或receive 异常的剔除列表 删除clientSocket
  133. 133         //其实非要说吧 一般情况 服务端一直在receive 不用心跳其实也是可以的(客户端可能是真的需要
  134. 134         //tcp底层就已经有了一个判断对方在不在的机制 , 对方直接关程序 结束进程 这些 只要tcp在receive就立即能够感知 所以说心跳 用不用看情况吧
  135. 135
  136. 136         //tcp 不会丢包 哪怕是连接建立了   你还没开始receive   对方却先发了,
  137. 137         //对方只要是发了的数据 都由操作系统像个缓存样给你放那的 不会掉 你再隔10秒开始receive都能rec的到
  138. 138
  139. 139         //tcp甚至在拔掉网线 再重新插上 都可以保证数据一致性
  140. 140         //tcp的包顺序能够保证 先发的先到
  141. 141
  142. 142         //nures代码中那些beginreceivexxx  异步receive的核心机制就是 ,假定数据到的时候把数据保存到xxx数组
  143. 143         //真正endreceive的时候 其实数据已经接收 处理完成了
  144. 144
  145. 145         try
  146. 146         {
  147. 147
  148. 148             //处理完当前accept
  149. 149             Socket currentSocket = serverSocket.EndAccept(iar);
  150. 150
  151. 151             EndpointClient client = new EndpointClient(currentSocket,telType);
  152. 152
  153. 153             //新增客户端
  154. 154             ClientAddDelGetList(client, EndPointClientsChangeType.Add);
  155. 155            
  156. 156             if (onClientAddDel != null)
  157. 157             {
  158. 158                 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
  159. 159                 onClientAddDel(onlines);
  160. 160
  161. 161                 //客户端异常掉线
  162. 162                 client.onClientDel = new Action<string>((_remoteIPPort) =>
  163. 163                 {
  164. 164                     ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del);
  165. 165
  166. 166                     List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
  167. 167                     onClientAddDel(onlines2);
  168. 168                 });
  169. 169             }
  170. 170
  171. 171            
  172. 172
  173. 173             //这句到时调用完成后 就会自动把 receivebuffer填充 //要接收的字节数 系统底层操作一次接收多少字节 其实意义不大
  174. 174             //总是从0开始(就是说并发时会覆盖
  175. 175
  176. 176             Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString()));
  177. 177
  178. 178             //currentSocket.Close();
  179. 179             //Application.Exit();
  180. 180
  181. 181             //Thread.Sleep(1000 * 10);
  182. 182             client.onReceive += this.onReceive;
  183. 183
  184. 184             client.BeginReceive();
  185. 185
  186. 186
  187. 187             //立即开始accept新的客户端
  188. 188             if (isRunning == true && serverSocket != null)
  189. 189                 serverSocket.BeginAccept(AcceptCallback, serverSocket);
  190. 190             //beginAccept 最开始的方法可以不一样 ,但最终肯定是一个不断accept的闭环过程
  191. 191             //整个过程就像个导流样 ,最开始用异步导流到一个固定的点 然后让其循环源源不断运转
  192. 192
  193. 193             //加asynccallback 有什么不一样么
  194. 194             //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket);
  195. 195
  196. 196         }
  197. 197         catch (Exception ex)
  198. 198         {
  199. 199             Console.WriteLine("AcceptCallback Error");
  200. 200             Console.WriteLine(ex.Message);
  201. 201         }
  202. 202
  203. 203     }
  204. 204
  205. 205   
  206. 206 }
复制代码
EndpointClient终端代码代表客户端的对口人,他的onReceive 等资源从服务端继承 ,如果服务端想给某个特定客户端发数据则会调用他们中的某一个 毫无疑问这是通过remoteIPport来判断的,这些都是编写基本socket结构轻车熟路的老套路
以下EndpointClient代码
  1.   1 public class EndpointClient
  2.   2 {
  3.   3     Socket workingSocket;
  4.   4     static int receiveBufferLenMax = 5000;
  5.   5     byte[] onceReadDatas = new byte[receiveBufferLenMax];
  6.   6     List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax);
  7.   7
  8.   8     public string remoteIPPort { get; set; }
  9.   9     
  10. 10     //当前残留数据区 接收数据的起始指针(也代表缓冲区数据长度
  11. 11     int receiveBufferLen = 0;
  12. 12
  13. 13
  14. 14     TelgramType telType;
  15. 15
  16. 16     public Action<Telegram_Base> onReceive;
  17. 17     public Action<string> onClientDel;
  18. 18
  19. 19     public EndpointClient()
  20. 20     {
  21. 21
  22. 22     }
  23. 23     public EndpointClient(Socket _socket,TelgramType _telType)
  24. 24     {
  25. 25         this.remoteIPPort = _socket.RemoteEndPoint.ToString();
  26. 26         this.telType = _telType;
  27. 27         workingSocket = _socket;
  28. 28     }
  29. 29
  30. 30     public void Send(Telegram_Base tel)
  31. 31     {
  32. 32         //try
  33. 33         //{
  34. 34             if(workingSocket==null)
  35. 35             {
  36. 36                 Console.WriteLine("未初始化的EndpointClient");
  37. 37                 return;
  38. 38             }
  39. 39             if (tel is Telegram_Schedule)
  40. 40             {
  41. 41                 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
  42. 42                 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
  43. 43                 {
  44. 44                     Console.WriteLine("尝试发送数据长度格式错误的报文");
  45. 45                     return;
  46. 46                 }
  47. 47
  48. 48                 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
  49. 49                 byte[] sendbytes = telBeSend.dataBytes;
  50. 50
  51. 51                 //数据超过缓冲区长度 会导致无法拆包
  52. 52                 if (sendbytes.Length <= receiveBufferLenMax)
  53. 53                 {
  54. 54                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
  55. 55                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null
  56. 56                     
  57. 57                     );
  58. 58                 }
  59. 59                 else
  60. 60                 {
  61. 61                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
  62. 62                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");
  63. 63                 }
  64. 64
  65. 65             }
  66. 66             else if (tel is Telegram_SDBMsg)
  67. 67             {
  68. 68
  69. 69             }
  70. 70
  71. 71         //}
  72. 72         //catch (Exception ex)
  73. 73         //{
  74. 74
  75. 75         //    Console.WriteLine(ex.Message);
  76. 76         //    throw ex;
  77. 77         //}
  78. 78     }
  79. 79
  80. 80     public void BeginReceive()
  81. 81     {
  82. 82         if (workingSocket == null)
  83. 83         {
  84. 84             Console.WriteLine("未初始化的EndpointClient");
  85. 85             return;
  86. 86         }
  87. 87
  88. 88         receiveBufferLen = 0;
  89. 89         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
  90. 90             ReceiveCallback,
  91. 91         this);
  92. 92     }
  93. 93     private void ReceiveCallback(IAsyncResult iar)
  94. 94     {
  95. 95         try
  96. 96         {
  97. 97             EndpointClient cli = (EndpointClient)iar.AsyncState;
  98. 98             Socket socket = cli.workingSocket;
  99. 99             int reads = socket.EndReceive(iar);
  100. 100
  101. 101             if (reads > 0)
  102. 102             {
  103. 103
  104. 104                 for (int i = 0; i < reads; i++)
  105. 105                 {
  106. 106                     receiveBuffer.Add(onceReadDatas[i]);
  107. 107                 }
  108. 108
  109. 109                 //具体填充了多少看返回值 此时 数据已经在buffer中了
  110. 110                 receiveBufferLen += reads;
  111. 111                 //加完了后解析 阻塞式处理 结束后开启新的接收
  112. 112                 SloveTelData();
  113. 113
  114. 114                 if (receiveBufferLenMax - receiveBufferLen > 0)
  115. 115                 {
  116. 116                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
  117. 117                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
  118. 118                 }
  119. 119                 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
  120. 120                 {
  121. 121                     Close();
  122. 122                     //移除自己
  123. 123                     if (onClientDel != null)
  124. 124                     {
  125. 125                         onClientDel(remoteIPPort);
  126. 126                     }
  127. 127                     Console.WriteLine("服务端接口解析数据出现异常");
  128. 128                     throw new Exception("服务端接口解析数据出现异常");
  129. 129                 }
  130. 130             }
  131. 131             else//reads==0 客户端已关闭
  132. 132             {
  133. 133                 Close();
  134. 134                 //移除自己
  135. 135                 if (onClientDel != null)
  136. 136                 {
  137. 137                     onClientDel(remoteIPPort);
  138. 138                 }
  139. 139             }
  140. 140         }
  141. 141         catch (Exception ex)
  142. 142         {
  143. 143             Close();
  144. 144             //移除自己
  145. 145             if (onClientDel != null)
  146. 146             {
  147. 147                 onClientDel(remoteIPPort);
  148. 148             }
  149. 149
  150. 150             Console.WriteLine("ReceiveCallback Error");
  151. 151             Console.WriteLine(ex.Message);
  152. 152         }
  153. 153
  154. 154     }
  155. 155     void SloveTelData()
  156. 156     {
  157. 157         //进行数据解析
  158. 158         SloveTelDataUtil slo = new SloveTelDataUtil();
  159. 159         
  160. 160         if (telType == TelgramType.Schedule)
  161. 161         {
  162. 162             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort);
  163. 163             //buffer已经被处理一遍了 使用新的长度
  164. 164             receiveBufferLen = receiveBuffer.Count;
  165. 165             //解析出的每一个对象都触发 onreceive
  166. 166             for (int i = 0; i < dataEntitys.Count; i++)
  167. 167             {
  168. 168                 if (onReceive != null)
  169. 169                     onReceive(dataEntitys[i]);
  170. 170             }
  171. 171         }
  172. 172         else if (telType == TelgramType.SDBMsg)
  173. 173         {
  174. 174
  175. 175         }
  176. 176
  177. 177     }
  178. 178
  179. 179   
  180. 180     public void Close()
  181. 181     {
  182. 182         try
  183. 183         {
  184. 184             receiveBuffer.Clear();
  185. 185             receiveBufferLen = 0;
  186. 186             if (workingSocket != null && workingSocket.Connected)
  187. 187                 workingSocket.Close();
  188. 188         }
  189. 189         catch (Exception ex)
  190. 190         {
  191. 191             Console.WriteLine(ex.Message);
  192. 192         }
  193. 193         
  194. 194     }
  195. 195 }
复制代码
数据拆包与封包粘包处理

上面的代码可以看到 数据包处理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包后从缓冲区移除 解析多少个包触发多少次事件,残余数据留在缓冲区 然后继续开始新的beginReceive往缓冲区加。在异步机制中 到达endReceive的时候数据已经在缓冲区里了,这个自不用多说噻。数据包和粘包逻辑在公共类库里供客户端服务端共同调用
以下是粘包处理逻辑
  1.   1 public class SloveTelDataUtil
  2.   2 {
  3.   3     List<Telegram_Schedule> solveList;
  4.   4     public SloveTelDataUtil()
  5.   5     {
  6.   6     }
  7.   7     
  8.   8     List<byte> buffer;
  9.   9     int bufferLen;
  10. 10     int bufferIndex = 0;
  11. 11     string remoteIPPort;
  12. 12     public List<Telegram_Schedule> Slove_Telegram_Schedule( List< byte> _buffer,int _bufferLen,string _remoteIPPort)
  13. 13     {
  14. 14
  15. 15         solveList = new List<Telegram_Schedule>();
  16. 16
  17. 17         bufferIndex = 0;
  18. 18
  19. 19         buffer = _buffer;
  20. 20         bufferLen = _bufferLen;
  21. 21         remoteIPPort = _remoteIPPort;
  22. 22
  23. 23         //小于最小长度 直接返回
  24. 24         if (bufferLen < 12)
  25. 25             return solveList;
  26. 26
  27. 27         //进行数据解析
  28. 28         bool anaysisOK = false;
  29. 29         while (anaysisOK=AnaysisData_Schedule()==true)//直到解析的不能解析为止
  30. 30         {               
  31. 31         }
  32. 32         return solveList;
  33. 33     }
  34. 34
  35. 35     public bool AnaysisData_Schedule()
  36. 36     {
  37. 37         if (bufferLen - bufferIndex < GlobalSymbol.Headerlen)
  38. 38             return false;
  39. 39
  40. 40         //解析出一个数据对象
  41. 41         Telegram_Schedule telObj = new Telegram_Schedule();
  42. 42
  43. 43         //必定是大于最小数据大小的
  44. 44         telObj.dataBytesHeader = new byte[GlobalSymbol.Headerlen];
  45. 45         buffer.CopyTo(bufferIndex, telObj.dataBytesHeader, 0, GlobalSymbol.Headerlen);
  46. 46
  47. 47         byte[] btsHeader = new byte[4];
  48. 48         byte[] btsCommand = new byte[4];
  49. 49         byte[] btsLen = new byte[4];
  50. 50
  51. 51         btsHeader[0] = buffer[bufferIndex];
  52. 52         btsHeader[1] = buffer[bufferIndex+1];
  53. 53         btsHeader[2] = buffer[bufferIndex+2];
  54. 54         btsHeader[3] = buffer[bufferIndex+3];
  55. 55
  56. 56         bufferIndex += 4;
  57. 57
  58. 58         btsCommand[0] = buffer[bufferIndex];
  59. 59         btsCommand[1] = buffer[bufferIndex + 1];
  60. 60         btsCommand[2] = buffer[bufferIndex + 2];
  61. 61         btsCommand[3] = buffer[bufferIndex + 3];
  62. 62
  63. 63         bufferIndex += 4;
  64. 64
  65. 65         btsLen[0] = buffer[bufferIndex];
  66. 66         btsLen[1] = buffer[bufferIndex + 1];
  67. 67         btsLen[2] = buffer[bufferIndex + 2];
  68. 68         btsLen[3] = buffer[bufferIndex + 3];
  69. 69
  70. 70         bufferIndex += 4;
  71. 71
  72. 72         
  73. 73
  74. 74         int dataLen = BitConverter.ToInt32(btsLen, 0);
  75. 75         telObj.header = BitConverter.ToUInt32(btsHeader, 0);
  76. 76         telObj.command = BitConverter.ToInt32(btsCommand, 0);
  77. 77         telObj.remoteIPPort = remoteIPPort;
  78. 78
  79. 79         if(dataLen>0)
  80. 80         {
  81. 81             //数据区小于得到的数据长度 说明数据部分还没接收到 不删除缓冲区 不做任何处理
  82. 82             //下次来了连着头一起解析
  83. 83             if (bufferLen - GlobalSymbol.Headerlen < dataLen)
  84. 84             {
  85. 85
  86. 86                 bufferIndex -= 12;//
  87. 87
  88. 88
  89. 89                 return false;
  90. 90
  91. 91             }
  92. 92             else
  93. 93             {
  94. 94
  95. 95                 telObj.dataLen = dataLen;
  96. 96                 telObj.dataBytes = new byte[dataLen];
  97. 97                 buffer.CopyTo(bufferIndex, telObj.dataBytes, 0, dataLen);
  98. 98                 
  99. 99                 solveList.Add(telObj);
  100. 100                 //bufferIndex += dataLen;
  101. 101
  102. 102                 //解析成功一次 移除已解析的
  103. 103                 for (int i = 0; i < GlobalSymbol.Headerlen+dataLen; i++)
  104. 104                 {
  105. 105                     buffer.RemoveAt(0);
  106. 106                 }
  107. 107                 bufferIndex = 0;
  108. 108                 bufferLen = buffer.Count;
  109. 109                 return true;
  110. 110             }
  111. 111         }
  112. 112         else
  113. 113         {
  114. 114            
  115. 115             telObj.dataLen = 0;
  116. 116             solveList.Add(telObj);
  117. 117             //bufferIndex += 0;
  118. 118             //解析成功一次 移除已解析的
  119. 119             for (int i = 0; i < GlobalSymbol.Headerlen; i++)
  120. 120             {
  121. 121                 buffer.RemoveAt(0);
  122. 122             }
  123. 123             //解析成功一次因为移除了缓冲区 bufferIndex置0
  124. 124             bufferIndex = 0;
  125. 125             bufferLen = buffer.Count;
  126. 126             return true;
  127. 127         }
  128. 128
  129. 129     }
  130. 130
  131. 131     
  132. 132     public List<Telegram_SDBMsg> Slove_Telegram_SDBMsg(ref byte[] buffer)
  133. 133     {
  134. 134         return new List<Telegram_SDBMsg>();
  135. 135     }
  136. 136 }
复制代码
我们看到用到的数据包对象是Telegram_Schedule ,中间保存有报文数据,数据发送的目标等信息。
以下是数据包结构代码
  1. 1 public class Telegram_Base
  2. 2 {
  3. 3     public string remoteIPPort { get; set; }
  4. 4     //数据内容
  5. 5     public byte[] dataBytes { get; set; }
  6. 6     //头部内容的序列化
  7. 7     public byte[] dataBytesHeader { get; set; }
  8. 8
  9. 9     public string jsonStr { get; set; }
  10. 10     virtual public void SerialToBytes()
  11. 11     {
  12. 12
  13. 13     }
  14. 14
  15. 15     virtual public void SloveToTel()
  16. 16     {
  17. 17
  18. 18     }
  19. 19
  20. 20 }
  21. 21
  22. 22 public class Telegram_Schedule:Telegram_Base
  23. 23 {
  24. 24     
  25. 25     //头部标识 4字节
  26. 26     public UInt32 header { get; set; }
  27. 27     //命令对应枚举的 int 4字节
  28. 28     public int command { get; set; }
  29. 29     //数据长度 4字节
  30. 30     public int dataLen { get; set; }
  31. 31
  32. 32     
  33. 33
  34. 34     override public void SerialToBytes()
  35. 35     {
  36. 36         //有字符串数据 但是待发送字节是空
  37. 37         if ((string.IsNullOrEmpty(jsonStr) == false ))//&& (dataBytes==null || dataBytes.Length==0)
  38. 38         {
  39. 39             dataBytes = Encoding.UTF8.GetBytes(jsonStr);
  40. 40             dataLen = dataBytes.Length;
  41. 41             dataBytesHeader = new byte[GlobalSymbol.Headerlen];
  42. 42           
  43. 43             header = GlobalSymbol.HeaderSymbol;
  44. 44            
  45. 45             byte[] btsHeader = BitConverter.GetBytes(header);
  46. 46             byte[] btsCommand = BitConverter.GetBytes(command);
  47. 47             byte[] btsLen = BitConverter.GetBytes(dataLen);
  48. 48
  49. 49             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
  50. 50             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
  51. 51             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
  52. 52
  53. 53         }
  54. 54         else if((string.IsNullOrEmpty(jsonStr) == true )&& (dataBytes==null || dataBytes.Length==0)){
  55. 55             dataLen = 0;
  56. 56             dataBytes = new byte[0];
  57. 57
  58. 58             dataBytesHeader = new byte[GlobalSymbol.Headerlen];
  59. 59
  60. 60             header = GlobalSymbol.HeaderSymbol;
  61. 61
  62. 62             byte[] btsHeader = BitConverter.GetBytes(header);
  63. 63             byte[] btsCommand = BitConverter.GetBytes(command);
  64. 64             byte[] btsLen = BitConverter.GetBytes(dataLen);
  65. 65
  66. 66             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
  67. 67             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
  68. 68             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
  69. 69         }
  70. 70     }
  71. 71
  72. 72     override public void SloveToTel()
  73. 73     {
  74. 74         //只解析字符串数据部分 ,header 和len 在接收之初就已解析
  75. 75         this.jsonStr = Encoding.UTF8.GetString(this.dataBytes);
  76. 76     }
  77. 77
  78. 78 }
复制代码
客户端代码

最后是客户端,有了上面的结构,客户端就不足为谈了,稍微了解socket的人都熟知套路的 基本跟EndpointClient一致
  1.   1 public class MsgClientSchedule
  2.   2 {
  3.   3     Socket workingSocket;
  4.   4     //缓冲区最大数据长度
  5.   5     static int receiveBufferLenMax = 5000;
  6.   6     //单次receive数据(取决于tcp底层封包 但是不会超过缓冲区最大长度
  7.   7     byte[] onceReadDatas = new byte[receiveBufferLenMax];
  8.   8     //未解析到完整数据包时的残余数据保存区
  9.   9     List<byte> receiveBuffer = new List<byte>(receiveBufferLenMax);
  10. 10
  11. 11     string serverIP { get; set; }
  12. 12     int serverPort { get; set; }
  13. 13     public string localIPPort { get; set; }
  14. 14
  15. 15     //残余缓冲区数据长度
  16. 16     int receiveBufferLen = 0;
  17. 17
  18. 18     bool _isConnected { get; set; }
  19. 19
  20. 20     TelgramType telType;
  21. 21
  22. 22     //收一个包时触发
  23. 23     public Action<Telegram_Base> onReceive;
  24. 24     //与服务端断链时触发
  25. 25     public Action<string> onClientDel;
  26. 26
  27. 27
  28. 28     public bool isConnected { get { return _isConnected; } }
  29. 29     public MsgClientSchedule(string _serverIP,int _port)
  30. 30     {
  31. 31         serverIP = _serverIP;
  32. 32         serverPort = _port;
  33. 33         _isConnected = false;
  34. 34     }
  35. 35
  36. 36     public void Connect()
  37. 37     {
  38. 38         try
  39. 39         {
  40. 40             workingSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
  41. 41             IPEndPoint ipport = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
  42. 42             workingSocket.Connect(ipport);
  43. 43
  44. 44             localIPPort = workingSocket.LocalEndPoint.ToString();
  45. 45             _isConnected = true;
  46. 46             BeginReceive();
  47. 47         }
  48. 48         catch (Exception ex)
  49. 49         {
  50. 50             workingSocket = null;
  51. 51             _isConnected = false;
  52. 52
  53. 53             Console.WriteLine(ex.Message);
  54. 54         }
  55. 55
  56. 56     }
  57. 57
  58. 58
  59. 59
  60. 60
  61. 61     public void Send(Telegram_Base tel)
  62. 62     {
  63. 63         try
  64. 64         {
  65. 65             if(_isConnected==false)
  66. 66             {
  67. 67                 Console.WriteLine("未连接到服务器");
  68. 68                 return;
  69. 69             }
  70. 70             if (tel is Telegram_Schedule)
  71. 71             {
  72. 72                 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
  73. 73                 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
  74. 74                 {
  75. 75                     Console.WriteLine("尝试发送数据长度格式错误的报文");
  76. 76                     return;
  77. 77                 }
  78. 78                 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
  79. 79                 byte[] sendbytes = telBeSend.dataBytes;
  80. 80
  81. 81                 //数据超过缓冲区长度 会导致无法拆包
  82. 82                 if (sendbytes.Length <= receiveBufferLenMax)
  83. 83                 {
  84. 84                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
  85. 85                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0, null, null
  86. 86                        
  87. 87                     );
  88. 88                 }
  89. 89                 else
  90. 90                 {
  91. 91                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
  92. 92                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");
  93. 93                 }
  94. 94
  95. 95                 
  96. 96             }
  97. 97             else if (tel is Telegram_SDBMsg)
  98. 98             {
  99. 99
  100. 100             }
  101. 101
  102. 102         }
  103. 103         catch (Exception ex)
  104. 104         {
  105. 105             Close();
  106. 106             Console.WriteLine(ex.Message);
  107. 107             //throw ex;
  108. 108         }
  109. 109     }
  110. 110
  111. 111     public void BeginReceive()
  112. 112     {
  113. 113         receiveBufferLen = 0;
  114. 114         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
  115. 115             ReceiveCallback,
  116. 116            
  117. 117         this);
  118. 118     }
  119. 119     private void ReceiveCallback(IAsyncResult iar)
  120. 120     {
  121. 121         try
  122. 122         {
  123. 123             MsgClientSchedule cli = (MsgClientSchedule)iar.AsyncState;
  124. 124             Socket socket = cli.workingSocket;
  125. 125             int reads = socket.EndReceive(iar);
  126. 126
  127. 127             if (reads > 0)
  128. 128             {
  129. 129
  130. 130                 for (int i = 0; i < reads; i++)
  131. 131                 {
  132. 132                     receiveBuffer.Add(onceReadDatas[i]);
  133. 133                 }
  134. 134
  135. 135                 //具体填充了多少看返回值 此时 数据已经在buffer中了
  136. 136
  137. 137                 receiveBufferLen += reads;
  138. 138
  139. 139                 //加完了后解析 阻塞式处理 结束后开启新的接收
  140. 140                 SloveTelData();
  141. 141
  142. 142
  143. 143
  144. 144                 if (receiveBufferLenMax - receiveBufferLen > 0)
  145. 145                 {
  146. 146                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
  147. 147                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
  148. 148                 }
  149. 149                 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
  150. 150                 {
  151. 151                     Close();
  152. 152                     
  153. 153                     Console.WriteLine("服务端接口解析数据出现异常");
  154. 154                     throw new Exception("服务端接口解析数据出现异常");
  155. 155                 }
  156. 156             }
  157. 157             else//reads==0客户端已关闭
  158. 158             {
  159. 159                 Close();                    
  160. 160             }
  161. 161         }
  162. 162         catch (Exception ex)
  163. 163         {
  164. 164             Close();
  165. 165            
  166. 166             Console.WriteLine("ReceiveCallback Error");
  167. 167             Console.WriteLine(ex.Message);
  168. 168         }
  169. 169
  170. 170     }
  171. 171     private void SloveTelData()
  172. 172     {
  173. 173         
  174. 174         //进行数据解析
  175. 175         SloveTelDataUtil slo = new SloveTelDataUtil();
  176. 176
  177. 177         if (telType == TelgramType.Schedule)
  178. 178         {
  179. 179             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen,serverIP+":"+serverPort.ToString());
  180. 180             //buffer已经被处理一遍了 使用新的长度
  181. 181             receiveBufferLen = receiveBuffer.Count;
  182. 182             //解析出的每一个对象都触发 onreceive
  183. 183             for (int i = 0; i < dataEntitys.Count; i++)
  184. 184             {
  185. 185                 if (onReceive != null)
  186. 186                     onReceive(dataEntitys[i]);
  187. 187             }
  188. 188         }
  189. 189         else if (telType == TelgramType.SDBMsg)
  190. 190         {
  191. 191
  192. 192         }
  193. 193
  194. 194     }
  195. 195
  196. 196
  197. 197     public void Close()
  198. 198     {
  199. 199         try
  200. 200         {
  201. 201             _isConnected = false;
  202. 202
  203. 203             receiveBuffer.Clear();
  204. 204             receiveBufferLen = 0;
  205. 205             if (workingSocket != null && workingSocket.Connected)
  206. 206                 workingSocket.Close();
  207. 207         }
  208. 208         catch (Exception ex)
  209. 209         {
  210. 210             Console.WriteLine(ex.Message);
  211. 211         }
  212. 212
  213. 213     }
  214. 214
  215. 215 }
复制代码
服务端调用

构建一个winform基本项目
  1. 1 List<string> clients;
  2. 2 MsgServerSchedule server;
  3. 3 private void btn_start_Click(object sender, EventArgs e)
  4. 4 {
  5. 5     server = new MsgServerSchedule(int.Parse(tbx_port.Text));
  6. 6
  7. 7
  8. 8     server.Start();
  9. 9     if (server.isRunning == true)
  10. 10     {
  11. 11         btn_start.Enabled = false;
  12. 12
  13. 13         server.onReceive += new Action<Telegram_Base>(
  14. 14         (tel) =>
  15. 15         {
  16. 16             this.BeginInvoke(new Action(() =>
  17. 17             {
  18. 18                 if (tel is Telegram_Schedule)
  19. 19                 {
  20. 20                     Telegram_Schedule ts = tel as Telegram_Schedule;
  21. 21                     ts.SloveToTel();
  22. 22                     Console.WriteLine(string.Format("commandType:{0}", ((ScheduleTelCommandType)ts.command).ToString()));
  23. 23
  24. 24                     tbx_msgs.Text += ts.remoteIPPort + ">" + ts.jsonStr + "\r\n";
  25. 25
  26. 26                     //数据回发测试
  27. 27                     string fromip = ts.remoteIPPort;
  28. 28                     string srcMsg = ts.jsonStr;
  29. 29                     string fromServerMsg = ts.jsonStr + " -from server";
  30. 30                     ts.jsonStr = fromServerMsg;
  31. 31
  32. 32
  33. 33                     //如果消息里有指向信息 则转送到对应的客户端
  34. 34                     if (clients != null)
  35. 35                     {
  36. 36                         string to = null;
  37. 37                         for (int i = 0; i < clients.Count; i++)
  38. 38                         {
  39. 39                             if (srcMsg.Contains(clients[i]))
  40. 40                             {
  41. 41                                 to = clients[i];
  42. 42                                 break;
  43. 43                             }
  44. 44                         }
  45. 45
  46. 46                         if (to != null)
  47. 47                         {
  48. 48                             ts.remoteIPPort = to;
  49. 49                             string toMsg;
  50. 50                             //toMsg= srcMsg.Replace(to, "");
  51. 51                             toMsg = srcMsg.Replace(to, fromip);
  52. 52                             ts.jsonStr = toMsg;
  53. 53                             ts.SerialToBytes();
  54. 54
  55. 55                             server.SendToSomeOne(ts);
  56. 56                         }
  57. 57                         else
  58. 58                         {
  59. 59                             ts.SerialToBytes();
  60. 60                             server.SendToSomeOne(ts);
  61. 61                         }
  62. 62                     }
  63. 63                 }
  64. 64             }));
  65. 65
  66. 66         }
  67. 67         );
  68. 68
  69. 69         server.onClientAddDel += new Action<List<string>>((onlines) =>
  70. 70         {
  71. 71             this.BeginInvoke(
  72. 72                 new Action(() =>
  73. 73                 {
  74. 74                     clients = onlines;
  75. 75                     listbox_clients.Items.Clear();
  76. 76
  77. 77                     for (int i = 0; i < onlines.Count; i++)
  78. 78                     {
  79. 79                         listbox_clients.Items.Add(onlines[i]);
  80. 80                     }
  81. 81                 }));
  82. 82         });
  83. 83     }
  84. 84 }
  85. 85 private void btn_sendAll_Click(object sender, EventArgs e)
  86. 86 {
  87. 87     Telegram_Schedule tel = new Telegram_Schedule();
  88. 88     tel.header = GlobalSymbol.HeaderSymbol;
  89. 89     tel.command = (int)ScheduleTelCommandType.StartC2S;
  90. 90     tel.jsonStr = tbx_sendAll.Text;
  91. 91     tel.SerialToBytes();
  92. 92
  93. 93     server.SendToAll(tel);
  94. 94 }
复制代码
客户端调用
  1. 1 MsgClientSchedule client;
  2. 2
  3. 3 private void btn_start_Click(object sender, EventArgs e)
  4. 4 {
  5. 5     client = new MsgClientSchedule(tbx_ip.Text, int.Parse(tbx_port.Text));
  6. 6
  7. 7     client.Connect();
  8. 8
  9. 9     if (client.isConnected == true)
  10. 10     {
  11. 11         btn_start.Enabled = false;
  12. 12         
  13. 13         label1.Text = client.localIPPort;
  14. 14
  15. 15         client.onReceive = new Action<Telegram_Base>((tel) =>
  16. 16         {
  17. 17             this.BeginInvoke(
  18. 18                 new Action(() =>
  19. 19                 {
  20. 20                     tel.SloveToTel();
  21. 21                     tbx_rec.Text += tel.jsonStr + "\r\n";
  22. 22
  23. 23                 }));
  24. 24         });
  25. 25     }
  26. 26
  27. 27 }
  28. 28
  29. 29
  30. 30
  31. 31 private void btn_send_Click(object sender, EventArgs e)
  32. 32 {
  33. 33
  34. 34     if (client == null || client.isConnected == false)
  35. 35         return;
  36. 36
  37. 37     //for (int i = 0; i < 2; i++)
  38. 38     //{
  39. 39         Telegram_Schedule tel = new Telegram_Schedule();
  40. 40         tel.command = (int)ScheduleTelCommandType.MsgC2S;
  41. 41     
  42. 42         tel.jsonStr = tbx_remoteip.Text+">"+ tbx_msgSend.Text;
  43. 43         tel.SerialToBytes();//发出前要先序列化
  44. 44
  45. 45         client.Send(tel);
  46. 46     //}
  47. 47     
  48. 48 }
复制代码
实现效果

可以多客户端连接互相自由发送消息,服务端可以编写转发规则代码,那些什么棋牌啊 互动白板 以及其他类似的应用就可以基于此之上发挥想象了

 

来源:https://www.cnblogs.com/assassinx/archive/2023/02/25/17154167.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具