宏丰皮具 发表于 2023-2-25 16:10:56

一个简易socket通信结构

服务端

工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。
以下是服务端代码
1 public class MsgServerSchedule
2 {
3
4
5   Socket serverSocket;
6   public Action<List<string>> onClientAddDel;
7   public Action<Telegram_Base> onReceive;
8   bool _isRunning = false;
9
10   
11   int port;
12
13   public TelgramType telType;
14
15   static List<EndpointClient> clients;
16
17   public bool isRunning { get { return _isRunning; } }
18   public MsgServerSchedule(int _port)
19   {
20         //any 就决定了 ip地址格式是v4
21         //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654);
22         //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
23
24         this.port = _port;
25
26         clients = new List<EndpointClient>();
27
28         Console.WriteLine("constructor");
29
30   }
31
32   public void Start()
33   {
34         try
35         {
36             IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
37             serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
38             serverSocket.Bind(endPoint);
39             serverSocket.Listen(port);
40
41             serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket);
42
43             _isRunning = true;
44             Console.WriteLine("start");
45         }
46         catch (Exception ex)
47         {
48             _isRunning = false;
49             serverSocket = null;
50
51             Console.WriteLine("服务启动出现错误,可能端口已被占用:"+port);
52             Console.WriteLine(ex.Message);
53         }
54      
55   }
56
57   public void Stop()
58   {
59         for (int i = 0; i < clients.Count; i++)
60         {
61             clients.Close();               
62         }
63         ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll);
64         serverSocket.Close();
65         _isRunning = false;
66   }
67
68   public void SendToAll(Telegram_Base tel)
69   {
70         for (int i = 0; i < clients.Count; i++)
71         {
72             clients.Send(tel);
73         }
74   }
75
76   public void SendToSomeOne(Telegram_Base tel)
77   {
78         for (int i = 0; i < clients.Count; i++)
79         {
80             if(clients.remoteIPPort==tel.remoteIPPort)
81             {
82               clients.Send(tel);
83               break;
84             }
85         }
86   }
87
88   //新增与删除客户端 秉持原子操作
89   List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType)
90   {
91         //异步同时有新客户端上线 与下线 不进行资源互斥访问会报错
92         lock (this)
93         {
94             if (changeType == EndPointClientsChangeType.Add)
95             {
96               clients.Add(cli);
97             }
98             else if(changeType== EndPointClientsChangeType.Del)
99             {
100               var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort);
101               if (beRemoveClient != null)
102                     clients.Remove(beRemoveClient);
103             }
104             else if(changeType== EndPointClientsChangeType.ClearAll)
105             {
106               clients.Clear();
107             }
108             else if (changeType == EndPointClientsChangeType.GetAll)
109             {
110               List<string> onLines = new List<string>(clients.Count);
111               for (int i = 0; i < clients.Count; i++)
112               {
113                     onLines.Add(clients.remoteIPPort);
114               }
115               return onLines;
116             }
117             else
118             {
119               return null;
120             }
121         }
122         return null;
123   }
124   //异步监听客户端 有客户端到来时的回调
125   private void AcceptCallback(IAsyncResult iar)
126   {
127         //server端一直在receive 能够感知到客户端掉线 (连上后 关闭客户端 server立即有错误爆出)
128         //但是同情况 关闭server端 客户端无错误爆出 直到点发送 才有错误爆出
129         //由此得出 处于receive才会有掉线感知,send时发现发不出去自然也会有感知 跟人的正常思维理解是一样的
130         //虽然tcp是所谓的长连接 ,通过反复测试->但是双方相互都处在一个静止状态 是无法 确定在不在的
131         //连上后平常的情况下 并没有数据流通 的 ,双方只是一个状态的保持而已。
132         //这也是为什么 好多服务 客户端 程序 都有个心跳机制(由此我们可以想到继续完善 弄个客户端列表 心跳超时的剔除列表 正常发消息send 或receive 异常的剔除列表 删除clientSocket
133         //其实非要说吧 一般情况 服务端一直在receive 不用心跳其实也是可以的(客户端可能是真的需要
134         //tcp底层就已经有了一个判断对方在不在的机制 , 对方直接关程序 结束进程 这些 只要tcp在receive就立即能够感知 所以说心跳 用不用看情况吧
135
136         //tcp 不会丢包 哪怕是连接建立了   你还没开始receive   对方却先发了,
137         //对方只要是发了的数据 都由操作系统像个缓存样给你放那的 不会掉 你再隔10秒开始receive都能rec的到
138
139         //tcp甚至在拔掉网线 再重新插上 都可以保证数据一致性
140         //tcp的包顺序能够保证 先发的先到
141
142         //nures代码中那些beginreceivexxx异步receive的核心机制就是 ,假定数据到的时候把数据保存到xxx数组
143         //真正endreceive的时候 其实数据已经接收 处理完成了
144
145         try
146         {
147
148             //处理完当前accept
149             Socket currentSocket = serverSocket.EndAccept(iar);
150
151             EndpointClient client = new EndpointClient(currentSocket,telType);
152
153             //新增客户端
154             ClientAddDelGetList(client, EndPointClientsChangeType.Add);
155            
156             if (onClientAddDel != null)
157             {
158               List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
159               onClientAddDel(onlines);
160
161               //客户端异常掉线
162               client.onClientDel = new Action<string>((_remoteIPPort) =>
163               {
164                     ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del);
165
166                     List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
167                     onClientAddDel(onlines2);
168               });
169             }
170
171            
172
173             //这句到时调用完成后 就会自动把 receivebuffer填充 //要接收的字节数 系统底层操作一次接收多少字节 其实意义不大
174             //总是从0开始(就是说并发时会覆盖
175
176             Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString()));
177
178             //currentSocket.Close();
179             //Application.Exit();
180
181             //Thread.Sleep(1000 * 10);
182             client.onReceive += this.onReceive;
183
184             client.BeginReceive();
185
186
187             //立即开始accept新的客户端
188             if (isRunning == true && serverSocket != null)
189               serverSocket.BeginAccept(AcceptCallback, serverSocket);
190             //beginAccept 最开始的方法可以不一样 ,但最终肯定是一个不断accept的闭环过程
191             //整个过程就像个导流样 ,最开始用异步导流到一个固定的点 然后让其循环源源不断运转
192
193             //加asynccallback 有什么不一样么
194             //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket);
195
196         }
197         catch (Exception ex)
198         {
199             Console.WriteLine("AcceptCallback Error");
200             Console.WriteLine(ex.Message);
201         }
202
203   }
204
205   
206 }EndpointClient终端代码代表客户端的对口人,他的onReceive 等资源从服务端继承 ,如果服务端想给某个特定客户端发数据则会调用他们中的某一个 毫无疑问这是通过remoteIPport来判断的,这些都是编写基本socket结构轻车熟路的老套路
以下EndpointClient代码
1 public class EndpointClient
2 {
3   Socket workingSocket;
4   static int receiveBufferLenMax = 5000;
5   byte[] onceReadDatas = new byte;
6   List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax);
7
8   public string remoteIPPort { get; set; }
9   
10   //当前残留数据区 接收数据的起始指针(也代表缓冲区数据长度
11   int receiveBufferLen = 0;
12
13
14   TelgramType telType;
15
16   public Action<Telegram_Base> onReceive;
17   public Action<string> onClientDel;
18
19   public EndpointClient()
20   {
21
22   }
23   public EndpointClient(Socket _socket,TelgramType _telType)
24   {
25         this.remoteIPPort = _socket.RemoteEndPoint.ToString();
26         this.telType = _telType;
27         workingSocket = _socket;
28   }
29
30   public void Send(Telegram_Base tel)
31   {
32         //try
33         //{
34             if(workingSocket==null)
35             {
36               Console.WriteLine("未初始化的EndpointClient");
37               return;
38             }
39             if (tel is Telegram_Schedule)
40             {
41               Telegram_Schedule telBeSend = tel as Telegram_Schedule;
42               if (telBeSend.dataBytes.Length != telBeSend.dataLen)
43               {
44                     Console.WriteLine("尝试发送数据长度格式错误的报文");
45                     return;
46               }
47
48               byte[] sendBytesHeader = telBeSend.dataBytesHeader;
49               byte[] sendbytes = telBeSend.dataBytes;
50
51               //数据超过缓冲区长度 会导致无法拆包
52               if (sendbytes.Length <= receiveBufferLenMax)
53               {
54                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
55                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null
56                     
57                     );
58               }
59               else
60               {
61                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
62                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");
63               }
64
65             }
66             else if (tel is Telegram_SDBMsg)
67             {
68
69             }
70
71         //}
72         //catch (Exception ex)
73         //{
74
75         //    Console.WriteLine(ex.Message);
76         //    throw ex;
77         //}
78   }
79
80   public void BeginReceive()
81   {
82         if (workingSocket == null)
83         {
84             Console.WriteLine("未初始化的EndpointClient");
85             return;
86         }
87
88         receiveBufferLen = 0;
89         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
90             ReceiveCallback,
91         this);
92   }
93   private void ReceiveCallback(IAsyncResult iar)
94   {
95         try
96         {
97             EndpointClient cli = (EndpointClient)iar.AsyncState;
98             Socket socket = cli.workingSocket;
99             int reads = socket.EndReceive(iar);
100
101             if (reads > 0)
102             {
103
104               for (int i = 0; i < reads; i++)
105               {
106                     receiveBuffer.Add(onceReadDatas);
107               }
108
109               //具体填充了多少看返回值 此时 数据已经在buffer中了
110               receiveBufferLen += reads;
111               //加完了后解析 阻塞式处理 结束后开启新的接收
112               SloveTelData();
113
114               if (receiveBufferLenMax - receiveBufferLen > 0)
115               {
116                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
117                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
118               }
119               else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
120               {
121                     Close();
122                     //移除自己
123                     if (onClientDel != null)
124                     {
125                         onClientDel(remoteIPPort);
126                     }
127                     Console.WriteLine("服务端接口解析数据出现异常");
128                     throw new Exception("服务端接口解析数据出现异常");
129               }
130             }
131             else//reads==0 客户端已关闭
132             {
133               Close();
134               //移除自己
135               if (onClientDel != null)
136               {
137                     onClientDel(remoteIPPort);
138               }
139             }
140         }
141         catch (Exception ex)
142         {
143             Close();
144             //移除自己
145             if (onClientDel != null)
146             {
147               onClientDel(remoteIPPort);
148             }
149
150             Console.WriteLine("ReceiveCallback Error");
151             Console.WriteLine(ex.Message);
152         }
153
154   }
155   void SloveTelData()
156   {
157         //进行数据解析
158         SloveTelDataUtil slo = new SloveTelDataUtil();
159         
160         if (telType == TelgramType.Schedule)
161         {
162             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort);
163             //buffer已经被处理一遍了 使用新的长度
164             receiveBufferLen = receiveBuffer.Count;
165             //解析出的每一个对象都触发 onreceive
166             for (int i = 0; i < dataEntitys.Count; i++)
167             {
168               if (onReceive != null)
169                     onReceive(dataEntitys);
170             }
171         }
172         else if (telType == TelgramType.SDBMsg)
173         {
174
175         }
176
177   }
178
179   
180   public void Close()
181   {
182         try
183         {
184             receiveBuffer.Clear();
185             receiveBufferLen = 0;
186             if (workingSocket != null && workingSocket.Connected)
187               workingSocket.Close();
188         }
189         catch (Exception ex)
190         {
191             Console.WriteLine(ex.Message);
192         }
193         
194   }
195 }数据拆包与封包粘包处理

上面的代码可以看到 数据包处理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包后从缓冲区移除 解析多少个包触发多少次事件,残余数据留在缓冲区 然后继续开始新的beginReceive往缓冲区加。在异步机制中 到达endReceive的时候数据已经在缓冲区里了,这个自不用多说噻。数据包和粘包逻辑在公共类库里供客户端服务端共同调用
以下是粘包处理逻辑
1 public class SloveTelDataUtil
2 {
3   List<Telegram_Schedule> solveList;
4   public SloveTelDataUtil()
5   {
6   }
7   
8   List<byte> buffer;
9   int bufferLen;
10   int bufferIndex = 0;
11   string remoteIPPort;
12   public List<Telegram_Schedule> Slove_Telegram_Schedule( List< byte> _buffer,int _bufferLen,string _remoteIPPort)
13   {
14
15         solveList = new List<Telegram_Schedule>();
16
17         bufferIndex = 0;
18
19         buffer = _buffer;
20         bufferLen = _bufferLen;
21         remoteIPPort = _remoteIPPort;
22
23         //小于最小长度 直接返回
24         if (bufferLen < 12)
25             return solveList;
26
27         //进行数据解析
28         bool anaysisOK = false;
29         while (anaysisOK=AnaysisData_Schedule()==true)//直到解析的不能解析为止
30         {               
31         }
32         return solveList;
33   }
34
35   public bool AnaysisData_Schedule()
36   {
37         if (bufferLen - bufferIndex < GlobalSymbol.Headerlen)
38             return false;
39
40         //解析出一个数据对象
41         Telegram_Schedule telObj = new Telegram_Schedule();
42
43         //必定是大于最小数据大小的
44         telObj.dataBytesHeader = new byte;
45         buffer.CopyTo(bufferIndex, telObj.dataBytesHeader, 0, GlobalSymbol.Headerlen);
46
47         byte[] btsHeader = new byte;
48         byte[] btsCommand = new byte;
49         byte[] btsLen = new byte;
50
51         btsHeader = buffer;
52         btsHeader = buffer;
53         btsHeader = buffer;
54         btsHeader = buffer;
55
56         bufferIndex += 4;
57
58         btsCommand = buffer;
59         btsCommand = buffer;
60         btsCommand = buffer;
61         btsCommand = buffer;
62
63         bufferIndex += 4;
64
65         btsLen = buffer;
66         btsLen = buffer;
67         btsLen = buffer;
68         btsLen = buffer;
69
70         bufferIndex += 4;
71
72         
73
74         int dataLen = BitConverter.ToInt32(btsLen, 0);
75         telObj.header = BitConverter.ToUInt32(btsHeader, 0);
76         telObj.command = BitConverter.ToInt32(btsCommand, 0);
77         telObj.remoteIPPort = remoteIPPort;
78
79         if(dataLen>0)
80         {
81             //数据区小于得到的数据长度 说明数据部分还没接收到 不删除缓冲区 不做任何处理
82             //下次来了连着头一起解析
83             if (bufferLen - GlobalSymbol.Headerlen < dataLen)
84             {
85
86               bufferIndex -= 12;//
87
88
89               return false;
90
91             }
92             else
93             {
94
95               telObj.dataLen = dataLen;
96               telObj.dataBytes = new byte;
97               buffer.CopyTo(bufferIndex, telObj.dataBytes, 0, dataLen);
98               
99               solveList.Add(telObj);
100               //bufferIndex += dataLen;
101
102               //解析成功一次 移除已解析的
103               for (int i = 0; i < GlobalSymbol.Headerlen+dataLen; i++)
104               {
105                     buffer.RemoveAt(0);
106               }
107               bufferIndex = 0;
108               bufferLen = buffer.Count;
109               return true;
110             }
111         }
112         else
113         {
114            
115             telObj.dataLen = 0;
116             solveList.Add(telObj);
117             //bufferIndex += 0;
118             //解析成功一次 移除已解析的
119             for (int i = 0; i < GlobalSymbol.Headerlen; i++)
120             {
121               buffer.RemoveAt(0);
122             }
123             //解析成功一次因为移除了缓冲区 bufferIndex置0
124             bufferIndex = 0;
125             bufferLen = buffer.Count;
126             return true;
127         }
128
129   }
130
131   
132   public List<Telegram_SDBMsg> Slove_Telegram_SDBMsg(ref byte[] buffer)
133   {
134         return new List<Telegram_SDBMsg>();
135   }
136 }我们看到用到的数据包对象是Telegram_Schedule ,中间保存有报文数据,数据发送的目标等信息。
以下是数据包结构代码
1 public class Telegram_Base
2 {
3   public string remoteIPPort { get; set; }
4   //数据内容
5   public byte[] dataBytes { get; set; }
6   //头部内容的序列化
7   public byte[] dataBytesHeader { get; set; }
8
9   public string jsonStr { get; set; }
10   virtual public void SerialToBytes()
11   {
12
13   }
14
15   virtual public void SloveToTel()
16   {
17
18   }
19
20 }
21
22 public class Telegram_Schedule:Telegram_Base
23 {
24   
25   //头部标识 4字节
26   public UInt32 header { get; set; }
27   //命令对应枚举的 int 4字节
28   public int command { get; set; }
29   //数据长度 4字节
30   public int dataLen { get; set; }
31
32   
33
34   override public void SerialToBytes()
35   {
36         //有字符串数据 但是待发送字节是空
37         if ((string.IsNullOrEmpty(jsonStr) == false ))//&& (dataBytes==null || dataBytes.Length==0)
38         {
39             dataBytes = Encoding.UTF8.GetBytes(jsonStr);
40             dataLen = dataBytes.Length;
41             dataBytesHeader = new byte;
42         
43             header = GlobalSymbol.HeaderSymbol;
44            
45             byte[] btsHeader = BitConverter.GetBytes(header);
46             byte[] btsCommand = BitConverter.GetBytes(command);
47             byte[] btsLen = BitConverter.GetBytes(dataLen);
48
49             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
50             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
51             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
52
53         }
54         else if((string.IsNullOrEmpty(jsonStr) == true )&& (dataBytes==null || dataBytes.Length==0)){
55             dataLen = 0;
56             dataBytes = new byte;
57
58             dataBytesHeader = new byte;
59
60             header = GlobalSymbol.HeaderSymbol;
61
62             byte[] btsHeader = BitConverter.GetBytes(header);
63             byte[] btsCommand = BitConverter.GetBytes(command);
64             byte[] btsLen = BitConverter.GetBytes(dataLen);
65
66             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
67             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
68             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
69         }
70   }
71
72   override public void SloveToTel()
73   {
74         //只解析字符串数据部分 ,header 和len 在接收之初就已解析
75         this.jsonStr = Encoding.UTF8.GetString(this.dataBytes);
76   }
77
78 }客户端代码

最后是客户端,有了上面的结构,客户端就不足为谈了,稍微了解socket的人都熟知套路的 基本跟EndpointClient一致
1 public class MsgClientSchedule
2 {
3   Socket workingSocket;
4   //缓冲区最大数据长度
5   static int receiveBufferLenMax = 5000;
6   //单次receive数据(取决于tcp底层封包 但是不会超过缓冲区最大长度
7   byte[] onceReadDatas = new byte;
8   //未解析到完整数据包时的残余数据保存区
9   List<byte> receiveBuffer = new List<byte>(receiveBufferLenMax);
10
11   string serverIP { get; set; }
12   int serverPort { get; set; }
13   public string localIPPort { get; set; }
14
15   //残余缓冲区数据长度
16   int receiveBufferLen = 0;
17
18   bool _isConnected { get; set; }
19
20   TelgramType telType;
21
22   //收一个包时触发
23   public Action<Telegram_Base> onReceive;
24   //与服务端断链时触发
25   public Action<string> onClientDel;
26
27
28   public bool isConnected { get { return _isConnected; } }
29   public MsgClientSchedule(string _serverIP,int _port)
30   {
31         serverIP = _serverIP;
32         serverPort = _port;
33         _isConnected = false;
34   }
35
36   public void Connect()
37   {
38         try
39         {
40             workingSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
41             IPEndPoint ipport = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
42             workingSocket.Connect(ipport);
43
44             localIPPort = workingSocket.LocalEndPoint.ToString();
45             _isConnected = true;
46             BeginReceive();
47         }
48         catch (Exception ex)
49         {
50             workingSocket = null;
51             _isConnected = false;
52
53             Console.WriteLine(ex.Message);
54         }
55
56   }
57
58
59
60
61   public void Send(Telegram_Base tel)
62   {
63         try
64         {
65             if(_isConnected==false)
66             {
67               Console.WriteLine("未连接到服务器");
68               return;
69             }
70             if (tel is Telegram_Schedule)
71             {
72               Telegram_Schedule telBeSend = tel as Telegram_Schedule;
73               if (telBeSend.dataBytes.Length != telBeSend.dataLen)
74               {
75                     Console.WriteLine("尝试发送数据长度格式错误的报文");
76                     return;
77               }
78               byte[] sendBytesHeader = telBeSend.dataBytesHeader;
79               byte[] sendbytes = telBeSend.dataBytes;
80
81               //数据超过缓冲区长度 会导致无法拆包
82               if (sendbytes.Length <= receiveBufferLenMax)
83               {
84                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
85                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0, null, null
86                        
87                     );
88               }
89               else
90               {
91                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
92                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");
93               }
94
95               
96             }
97             else if (tel is Telegram_SDBMsg)
98             {
99
100             }
101
102         }
103         catch (Exception ex)
104         {
105             Close();
106             Console.WriteLine(ex.Message);
107             //throw ex;
108         }
109   }
110
111   public void BeginReceive()
112   {
113         receiveBufferLen = 0;
114         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
115             ReceiveCallback,
116            
117         this);
118   }
119   private void ReceiveCallback(IAsyncResult iar)
120   {
121         try
122         {
123             MsgClientSchedule cli = (MsgClientSchedule)iar.AsyncState;
124             Socket socket = cli.workingSocket;
125             int reads = socket.EndReceive(iar);
126
127             if (reads > 0)
128             {
129
130               for (int i = 0; i < reads; i++)
131               {
132                     receiveBuffer.Add(onceReadDatas);
133               }
134
135               //具体填充了多少看返回值 此时 数据已经在buffer中了
136
137               receiveBufferLen += reads;
138
139               //加完了后解析 阻塞式处理 结束后开启新的接收
140               SloveTelData();
141
142
143
144               if (receiveBufferLenMax - receiveBufferLen > 0)
145               {
146                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
147                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
148               }
149               else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
150               {
151                     Close();
152                     
153                     Console.WriteLine("服务端接口解析数据出现异常");
154                     throw new Exception("服务端接口解析数据出现异常");
155               }
156             }
157             else//reads==0客户端已关闭
158             {
159               Close();                  
160             }
161         }
162         catch (Exception ex)
163         {
164             Close();
165            
166             Console.WriteLine("ReceiveCallback Error");
167             Console.WriteLine(ex.Message);
168         }
169
170   }
171   private void SloveTelData()
172   {
173         
174         //进行数据解析
175         SloveTelDataUtil slo = new SloveTelDataUtil();
176
177         if (telType == TelgramType.Schedule)
178         {
179             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen,serverIP+":"+serverPort.ToString());
180             //buffer已经被处理一遍了 使用新的长度
181             receiveBufferLen = receiveBuffer.Count;
182             //解析出的每一个对象都触发 onreceive
183             for (int i = 0; i < dataEntitys.Count; i++)
184             {
185               if (onReceive != null)
186                     onReceive(dataEntitys);
187             }
188         }
189         else if (telType == TelgramType.SDBMsg)
190         {
191
192         }
193
194   }
195
196
197   public void Close()
198   {
199         try
200         {
201             _isConnected = false;
202
203             receiveBuffer.Clear();
204             receiveBufferLen = 0;
205             if (workingSocket != null && workingSocket.Connected)
206               workingSocket.Close();
207         }
208         catch (Exception ex)
209         {
210             Console.WriteLine(ex.Message);
211         }
212
213   }
214
215 }服务端调用

构建一个winform基本项目
1 List<string> clients;
2 MsgServerSchedule server;
3 private void btn_start_Click(object sender, EventArgs e)
4 {
5   server = new MsgServerSchedule(int.Parse(tbx_port.Text));
6
7
8   server.Start();
9   if (server.isRunning == true)
10   {
11         btn_start.Enabled = false;
12
13         server.onReceive += new Action<Telegram_Base>(
14         (tel) =>
15         {
16             this.BeginInvoke(new Action(() =>
17             {
18               if (tel is Telegram_Schedule)
19               {
20                     Telegram_Schedule ts = tel as Telegram_Schedule;
21                     ts.SloveToTel();
22                     Console.WriteLine(string.Format("commandType:{0}", ((ScheduleTelCommandType)ts.command).ToString()));
23
24                     tbx_msgs.Text += ts.remoteIPPort + ">" + ts.jsonStr + "\r\n";
25
26                     //数据回发测试
27                     string fromip = ts.remoteIPPort;
28                     string srcMsg = ts.jsonStr;
29                     string fromServerMsg = ts.jsonStr + " -from server";
30                     ts.jsonStr = fromServerMsg;
31
32
33                     //如果消息里有指向信息 则转送到对应的客户端
34                     if (clients != null)
35                     {
36                         string to = null;
37                         for (int i = 0; i < clients.Count; i++)
38                         {
39                           if (srcMsg.Contains(clients))
40                           {
41                                 to = clients;
42                                 break;
43                           }
44                         }
45
46                         if (to != null)
47                         {
48                           ts.remoteIPPort = to;
49                           string toMsg;
50                           //toMsg= srcMsg.Replace(to, "");
51                           toMsg = srcMsg.Replace(to, fromip);
52                           ts.jsonStr = toMsg;
53                           ts.SerialToBytes();
54
55                           server.SendToSomeOne(ts);
56                         }
57                         else
58                         {
59                           ts.SerialToBytes();
60                           server.SendToSomeOne(ts);
61                         }
62                     }
63               }
64             }));
65
66         }
67         );
68
69         server.onClientAddDel += new Action<List<string>>((onlines) =>
70         {
71             this.BeginInvoke(
72               new Action(() =>
73               {
74                     clients = onlines;
75                     listbox_clients.Items.Clear();
76
77                     for (int i = 0; i < onlines.Count; i++)
78                     {
79                         listbox_clients.Items.Add(onlines);
80                     }
81               }));
82         });
83   }
84 }
85 private void btn_sendAll_Click(object sender, EventArgs e)
86 {
87   Telegram_Schedule tel = new Telegram_Schedule();
88   tel.header = GlobalSymbol.HeaderSymbol;
89   tel.command = (int)ScheduleTelCommandType.StartC2S;
90   tel.jsonStr = tbx_sendAll.Text;
91   tel.SerialToBytes();
92
93   server.SendToAll(tel);
94 }客户端调用

1 MsgClientSchedule client;
2
3 private void btn_start_Click(object sender, EventArgs e)
4 {
5   client = new MsgClientSchedule(tbx_ip.Text, int.Parse(tbx_port.Text));
6
7   client.Connect();
8
9   if (client.isConnected == true)
10   {
11         btn_start.Enabled = false;
12         
13         label1.Text = client.localIPPort;
14
15         client.onReceive = new Action<Telegram_Base>((tel) =>
16         {
17             this.BeginInvoke(
18               new Action(() =>
19               {
20                     tel.SloveToTel();
21                     tbx_rec.Text += tel.jsonStr + "\r\n";
22
23               }));
24         });
25   }
26
27 }
28
29
30
31 private void btn_send_Click(object sender, EventArgs e)
32 {
33
34   if (client == null || client.isConnected == false)
35         return;
36
37   //for (int i = 0; i < 2; i++)
38   //{
39         Telegram_Schedule tel = new Telegram_Schedule();
40         tel.command = (int)ScheduleTelCommandType.MsgC2S;
41   
42         tel.jsonStr = tbx_remoteip.Text+">"+ tbx_msgSend.Text;
43         tel.SerialToBytes();//发出前要先序列化
44
45         client.Send(tel);
46   //}
47   
48 }实现效果

可以多客户端连接互相自由发送消息,服务端可以编写转发规则代码,那些什么棋牌啊 互动白板 以及其他类似的应用就可以基于此之上发挥想象了
https://img2023.cnblogs.com/blog/72285/202302/72285-20230225124906908-97578141.png
 

来源:https://www.cnblogs.com/assassinx/archive/2023/02/25/17154167.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 一个简易socket通信结构