|
服务端
工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。
以下是服务端代码- 1 public class MsgServerSchedule
- 2 {
- 3
- 4
- 5 Socket serverSocket;
- 6 public Action<List<string>> onClientAddDel;
- 7 public Action<Telegram_Base> onReceive;
- 8 bool _isRunning = false;
- 9
- 10
- 11 int port;
- 12
- 13 public TelgramType telType;
- 14
- 15 static List<EndpointClient> clients;
- 16
- 17 public bool isRunning { get { return _isRunning; } }
- 18 public MsgServerSchedule(int _port)
- 19 {
- 20 //any 就决定了 ip地址格式是v4
- 21 //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654);
- 22 //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- 23
- 24 this.port = _port;
- 25
- 26 clients = new List<EndpointClient>();
- 27
- 28 Console.WriteLine("constructor");
- 29
- 30 }
- 31
- 32 public void Start()
- 33 {
- 34 try
- 35 {
- 36 IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
- 37 serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- 38 serverSocket.Bind(endPoint);
- 39 serverSocket.Listen(port);
- 40
- 41 serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket);
- 42
- 43 _isRunning = true;
- 44 Console.WriteLine("start");
- 45 }
- 46 catch (Exception ex)
- 47 {
- 48 _isRunning = false;
- 49 serverSocket = null;
- 50
- 51 Console.WriteLine("服务启动出现错误,可能端口已被占用:"+port);
- 52 Console.WriteLine(ex.Message);
- 53 }
- 54
- 55 }
- 56
- 57 public void Stop()
- 58 {
- 59 for (int i = 0; i < clients.Count; i++)
- 60 {
- 61 clients[i].Close();
- 62 }
- 63 ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll);
- 64 serverSocket.Close();
- 65 _isRunning = false;
- 66 }
- 67
- 68 public void SendToAll(Telegram_Base tel)
- 69 {
- 70 for (int i = 0; i < clients.Count; i++)
- 71 {
- 72 clients[i].Send(tel);
- 73 }
- 74 }
- 75
- 76 public void SendToSomeOne(Telegram_Base tel)
- 77 {
- 78 for (int i = 0; i < clients.Count; i++)
- 79 {
- 80 if(clients[i].remoteIPPort==tel.remoteIPPort)
- 81 {
- 82 clients[i].Send(tel);
- 83 break;
- 84 }
- 85 }
- 86 }
- 87
- 88 //新增与删除客户端 秉持原子操作
- 89 List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType)
- 90 {
- 91 //异步同时有新客户端上线 与下线 不进行资源互斥访问会报错
- 92 lock (this)
- 93 {
- 94 if (changeType == EndPointClientsChangeType.Add)
- 95 {
- 96 clients.Add(cli);
- 97 }
- 98 else if(changeType== EndPointClientsChangeType.Del)
- 99 {
- 100 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort);
- 101 if (beRemoveClient != null)
- 102 clients.Remove(beRemoveClient);
- 103 }
- 104 else if(changeType== EndPointClientsChangeType.ClearAll)
- 105 {
- 106 clients.Clear();
- 107 }
- 108 else if (changeType == EndPointClientsChangeType.GetAll)
- 109 {
- 110 List<string> onLines = new List<string>(clients.Count);
- 111 for (int i = 0; i < clients.Count; i++)
- 112 {
- 113 onLines.Add(clients[i].remoteIPPort);
- 114 }
- 115 return onLines;
- 116 }
- 117 else
- 118 {
- 119 return null;
- 120 }
- 121 }
- 122 return null;
- 123 }
- 124 //异步监听客户端 有客户端到来时的回调
- 125 private void AcceptCallback(IAsyncResult iar)
- 126 {
- 127 //server端一直在receive 能够感知到客户端掉线 (连上后 关闭客户端 server立即有错误爆出)
- 128 //但是同情况 关闭server端 客户端无错误爆出 直到点发送 才有错误爆出
- 129 //由此得出 处于receive才会有掉线感知 ,send时发现发不出去自然也会有感知 跟人的正常思维理解是一样的
- 130 //虽然tcp是所谓的长连接 ,通过反复测试 ->但是双方相互都处在一个静止状态 是无法 确定在不在的
- 131 //连上后平常的情况下 并没有数据流通 的 ,双方只是一个状态的保持而已。
- 132 //这也是为什么 好多服务 客户端 程序 都有个心跳机制(由此我们可以想到继续完善 弄个客户端列表 心跳超时的剔除列表 正常发消息send 或receive 异常的剔除列表 删除clientSocket
- 133 //其实非要说吧 一般情况 服务端一直在receive 不用心跳其实也是可以的(客户端可能是真的需要
- 134 //tcp底层就已经有了一个判断对方在不在的机制 , 对方直接关程序 结束进程 这些 只要tcp在receive就立即能够感知 所以说心跳 用不用看情况吧
- 135
- 136 //tcp 不会丢包 哪怕是连接建立了 你还没开始receive 对方却先发了,
- 137 //对方只要是发了的数据 都由操作系统像个缓存样给你放那的 不会掉 你再隔10秒开始receive都能rec的到
- 138
- 139 //tcp甚至在拔掉网线 再重新插上 都可以保证数据一致性
- 140 //tcp的包顺序能够保证 先发的先到
- 141
- 142 //nures代码中那些beginreceivexxx 异步receive的核心机制就是 ,假定数据到的时候把数据保存到xxx数组
- 143 //真正endreceive的时候 其实数据已经接收 处理完成了
- 144
- 145 try
- 146 {
- 147
- 148 //处理完当前accept
- 149 Socket currentSocket = serverSocket.EndAccept(iar);
- 150
- 151 EndpointClient client = new EndpointClient(currentSocket,telType);
- 152
- 153 //新增客户端
- 154 ClientAddDelGetList(client, EndPointClientsChangeType.Add);
- 155
- 156 if (onClientAddDel != null)
- 157 {
- 158 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
- 159 onClientAddDel(onlines);
- 160
- 161 //客户端异常掉线
- 162 client.onClientDel = new Action<string>((_remoteIPPort) =>
- 163 {
- 164 ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del);
- 165
- 166 List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
- 167 onClientAddDel(onlines2);
- 168 });
- 169 }
- 170
- 171
- 172
- 173 //这句到时调用完成后 就会自动把 receivebuffer填充 //要接收的字节数 系统底层操作一次接收多少字节 其实意义不大
- 174 //总是从0开始(就是说并发时会覆盖
- 175
- 176 Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString()));
- 177
- 178 //currentSocket.Close();
- 179 //Application.Exit();
- 180
- 181 //Thread.Sleep(1000 * 10);
- 182 client.onReceive += this.onReceive;
- 183
- 184 client.BeginReceive();
- 185
- 186
- 187 //立即开始accept新的客户端
- 188 if (isRunning == true && serverSocket != null)
- 189 serverSocket.BeginAccept(AcceptCallback, serverSocket);
- 190 //beginAccept 最开始的方法可以不一样 ,但最终肯定是一个不断accept的闭环过程
- 191 //整个过程就像个导流样 ,最开始用异步导流到一个固定的点 然后让其循环源源不断运转
- 192
- 193 //加asynccallback 有什么不一样么
- 194 //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket);
- 195
- 196 }
- 197 catch (Exception ex)
- 198 {
- 199 Console.WriteLine("AcceptCallback Error");
- 200 Console.WriteLine(ex.Message);
- 201 }
- 202
- 203 }
- 204
- 205
- 206 }
复制代码 EndpointClient终端代码代表客户端的对口人,他的onReceive 等资源从服务端继承 ,如果服务端想给某个特定客户端发数据则会调用他们中的某一个 毫无疑问这是通过remoteIPport来判断的,这些都是编写基本socket结构轻车熟路的老套路
以下EndpointClient代码- 1 public class EndpointClient
- 2 {
- 3 Socket workingSocket;
- 4 static int receiveBufferLenMax = 5000;
- 5 byte[] onceReadDatas = new byte[receiveBufferLenMax];
- 6 List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax);
- 7
- 8 public string remoteIPPort { get; set; }
- 9
- 10 //当前残留数据区 接收数据的起始指针(也代表缓冲区数据长度
- 11 int receiveBufferLen = 0;
- 12
- 13
- 14 TelgramType telType;
- 15
- 16 public Action<Telegram_Base> onReceive;
- 17 public Action<string> onClientDel;
- 18
- 19 public EndpointClient()
- 20 {
- 21
- 22 }
- 23 public EndpointClient(Socket _socket,TelgramType _telType)
- 24 {
- 25 this.remoteIPPort = _socket.RemoteEndPoint.ToString();
- 26 this.telType = _telType;
- 27 workingSocket = _socket;
- 28 }
- 29
- 30 public void Send(Telegram_Base tel)
- 31 {
- 32 //try
- 33 //{
- 34 if(workingSocket==null)
- 35 {
- 36 Console.WriteLine("未初始化的EndpointClient");
- 37 return;
- 38 }
- 39 if (tel is Telegram_Schedule)
- 40 {
- 41 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
- 42 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
- 43 {
- 44 Console.WriteLine("尝试发送数据长度格式错误的报文");
- 45 return;
- 46 }
- 47
- 48 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
- 49 byte[] sendbytes = telBeSend.dataBytes;
- 50
- 51 //数据超过缓冲区长度 会导致无法拆包
- 52 if (sendbytes.Length <= receiveBufferLenMax)
- 53 {
- 54 workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
- 55 workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null
- 56
- 57 );
- 58 }
- 59 else
- 60 {
- 61 Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
- 62 throw new Exception("发送到调度客户端的数据超过缓冲区长度");
- 63 }
- 64
- 65 }
- 66 else if (tel is Telegram_SDBMsg)
- 67 {
- 68
- 69 }
- 70
- 71 //}
- 72 //catch (Exception ex)
- 73 //{
- 74
- 75 // Console.WriteLine(ex.Message);
- 76 // throw ex;
- 77 //}
- 78 }
- 79
- 80 public void BeginReceive()
- 81 {
- 82 if (workingSocket == null)
- 83 {
- 84 Console.WriteLine("未初始化的EndpointClient");
- 85 return;
- 86 }
- 87
- 88 receiveBufferLen = 0;
- 89 workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
- 90 ReceiveCallback,
- 91 this);
- 92 }
- 93 private void ReceiveCallback(IAsyncResult iar)
- 94 {
- 95 try
- 96 {
- 97 EndpointClient cli = (EndpointClient)iar.AsyncState;
- 98 Socket socket = cli.workingSocket;
- 99 int reads = socket.EndReceive(iar);
- 100
- 101 if (reads > 0)
- 102 {
- 103
- 104 for (int i = 0; i < reads; i++)
- 105 {
- 106 receiveBuffer.Add(onceReadDatas[i]);
- 107 }
- 108
- 109 //具体填充了多少看返回值 此时 数据已经在buffer中了
- 110 receiveBufferLen += reads;
- 111 //加完了后解析 阻塞式处理 结束后开启新的接收
- 112 SloveTelData();
- 113
- 114 if (receiveBufferLenMax - receiveBufferLen > 0)
- 115 {
- 116 //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
- 117 socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
- 118 }
- 119 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
- 120 {
- 121 Close();
- 122 //移除自己
- 123 if (onClientDel != null)
- 124 {
- 125 onClientDel(remoteIPPort);
- 126 }
- 127 Console.WriteLine("服务端接口解析数据出现异常");
- 128 throw new Exception("服务端接口解析数据出现异常");
- 129 }
- 130 }
- 131 else//reads==0 客户端已关闭
- 132 {
- 133 Close();
- 134 //移除自己
- 135 if (onClientDel != null)
- 136 {
- 137 onClientDel(remoteIPPort);
- 138 }
- 139 }
- 140 }
- 141 catch (Exception ex)
- 142 {
- 143 Close();
- 144 //移除自己
- 145 if (onClientDel != null)
- 146 {
- 147 onClientDel(remoteIPPort);
- 148 }
- 149
- 150 Console.WriteLine("ReceiveCallback Error");
- 151 Console.WriteLine(ex.Message);
- 152 }
- 153
- 154 }
- 155 void SloveTelData()
- 156 {
- 157 //进行数据解析
- 158 SloveTelDataUtil slo = new SloveTelDataUtil();
- 159
- 160 if (telType == TelgramType.Schedule)
- 161 {
- 162 List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort);
- 163 //buffer已经被处理一遍了 使用新的长度
- 164 receiveBufferLen = receiveBuffer.Count;
- 165 //解析出的每一个对象都触发 onreceive
- 166 for (int i = 0; i < dataEntitys.Count; i++)
- 167 {
- 168 if (onReceive != null)
- 169 onReceive(dataEntitys[i]);
- 170 }
- 171 }
- 172 else if (telType == TelgramType.SDBMsg)
- 173 {
- 174
- 175 }
- 176
- 177 }
- 178
- 179
- 180 public void Close()
- 181 {
- 182 try
- 183 {
- 184 receiveBuffer.Clear();
- 185 receiveBufferLen = 0;
- 186 if (workingSocket != null && workingSocket.Connected)
- 187 workingSocket.Close();
- 188 }
- 189 catch (Exception ex)
- 190 {
- 191 Console.WriteLine(ex.Message);
- 192 }
- 193
- 194 }
- 195 }
复制代码 数据拆包与封包粘包处理
上面的代码可以看到 数据包处理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包后从缓冲区移除 解析多少个包触发多少次事件,残余数据留在缓冲区 然后继续开始新的beginReceive往缓冲区加。在异步机制中 到达endReceive的时候数据已经在缓冲区里了,这个自不用多说噻。数据包和粘包逻辑在公共类库里供客户端服务端共同调用
以下是粘包处理逻辑- 1 public class SloveTelDataUtil
- 2 {
- 3 List<Telegram_Schedule> solveList;
- 4 public SloveTelDataUtil()
- 5 {
- 6 }
- 7
- 8 List<byte> buffer;
- 9 int bufferLen;
- 10 int bufferIndex = 0;
- 11 string remoteIPPort;
- 12 public List<Telegram_Schedule> Slove_Telegram_Schedule( List< byte> _buffer,int _bufferLen,string _remoteIPPort)
- 13 {
- 14
- 15 solveList = new List<Telegram_Schedule>();
- 16
- 17 bufferIndex = 0;
- 18
- 19 buffer = _buffer;
- 20 bufferLen = _bufferLen;
- 21 remoteIPPort = _remoteIPPort;
- 22
- 23 //小于最小长度 直接返回
- 24 if (bufferLen < 12)
- 25 return solveList;
- 26
- 27 //进行数据解析
- 28 bool anaysisOK = false;
- 29 while (anaysisOK=AnaysisData_Schedule()==true)//直到解析的不能解析为止
- 30 {
- 31 }
- 32 return solveList;
- 33 }
- 34
- 35 public bool AnaysisData_Schedule()
- 36 {
- 37 if (bufferLen - bufferIndex < GlobalSymbol.Headerlen)
- 38 return false;
- 39
- 40 //解析出一个数据对象
- 41 Telegram_Schedule telObj = new Telegram_Schedule();
- 42
- 43 //必定是大于最小数据大小的
- 44 telObj.dataBytesHeader = new byte[GlobalSymbol.Headerlen];
- 45 buffer.CopyTo(bufferIndex, telObj.dataBytesHeader, 0, GlobalSymbol.Headerlen);
- 46
- 47 byte[] btsHeader = new byte[4];
- 48 byte[] btsCommand = new byte[4];
- 49 byte[] btsLen = new byte[4];
- 50
- 51 btsHeader[0] = buffer[bufferIndex];
- 52 btsHeader[1] = buffer[bufferIndex+1];
- 53 btsHeader[2] = buffer[bufferIndex+2];
- 54 btsHeader[3] = buffer[bufferIndex+3];
- 55
- 56 bufferIndex += 4;
- 57
- 58 btsCommand[0] = buffer[bufferIndex];
- 59 btsCommand[1] = buffer[bufferIndex + 1];
- 60 btsCommand[2] = buffer[bufferIndex + 2];
- 61 btsCommand[3] = buffer[bufferIndex + 3];
- 62
- 63 bufferIndex += 4;
- 64
- 65 btsLen[0] = buffer[bufferIndex];
- 66 btsLen[1] = buffer[bufferIndex + 1];
- 67 btsLen[2] = buffer[bufferIndex + 2];
- 68 btsLen[3] = buffer[bufferIndex + 3];
- 69
- 70 bufferIndex += 4;
- 71
- 72
- 73
- 74 int dataLen = BitConverter.ToInt32(btsLen, 0);
- 75 telObj.header = BitConverter.ToUInt32(btsHeader, 0);
- 76 telObj.command = BitConverter.ToInt32(btsCommand, 0);
- 77 telObj.remoteIPPort = remoteIPPort;
- 78
- 79 if(dataLen>0)
- 80 {
- 81 //数据区小于得到的数据长度 说明数据部分还没接收到 不删除缓冲区 不做任何处理
- 82 //下次来了连着头一起解析
- 83 if (bufferLen - GlobalSymbol.Headerlen < dataLen)
- 84 {
- 85
- 86 bufferIndex -= 12;//
- 87
- 88
- 89 return false;
- 90
- 91 }
- 92 else
- 93 {
- 94
- 95 telObj.dataLen = dataLen;
- 96 telObj.dataBytes = new byte[dataLen];
- 97 buffer.CopyTo(bufferIndex, telObj.dataBytes, 0, dataLen);
- 98
- 99 solveList.Add(telObj);
- 100 //bufferIndex += dataLen;
- 101
- 102 //解析成功一次 移除已解析的
- 103 for (int i = 0; i < GlobalSymbol.Headerlen+dataLen; i++)
- 104 {
- 105 buffer.RemoveAt(0);
- 106 }
- 107 bufferIndex = 0;
- 108 bufferLen = buffer.Count;
- 109 return true;
- 110 }
- 111 }
- 112 else
- 113 {
- 114
- 115 telObj.dataLen = 0;
- 116 solveList.Add(telObj);
- 117 //bufferIndex += 0;
- 118 //解析成功一次 移除已解析的
- 119 for (int i = 0; i < GlobalSymbol.Headerlen; i++)
- 120 {
- 121 buffer.RemoveAt(0);
- 122 }
- 123 //解析成功一次因为移除了缓冲区 bufferIndex置0
- 124 bufferIndex = 0;
- 125 bufferLen = buffer.Count;
- 126 return true;
- 127 }
- 128
- 129 }
- 130
- 131
- 132 public List<Telegram_SDBMsg> Slove_Telegram_SDBMsg(ref byte[] buffer)
- 133 {
- 134 return new List<Telegram_SDBMsg>();
- 135 }
- 136 }
复制代码 我们看到用到的数据包对象是Telegram_Schedule ,中间保存有报文数据,数据发送的目标等信息。
以下是数据包结构代码- 1 public class Telegram_Base
- 2 {
- 3 public string remoteIPPort { get; set; }
- 4 //数据内容
- 5 public byte[] dataBytes { get; set; }
- 6 //头部内容的序列化
- 7 public byte[] dataBytesHeader { get; set; }
- 8
- 9 public string jsonStr { get; set; }
- 10 virtual public void SerialToBytes()
- 11 {
- 12
- 13 }
- 14
- 15 virtual public void SloveToTel()
- 16 {
- 17
- 18 }
- 19
- 20 }
- 21
- 22 public class Telegram_Schedule:Telegram_Base
- 23 {
- 24
- 25 //头部标识 4字节
- 26 public UInt32 header { get; set; }
- 27 //命令对应枚举的 int 4字节
- 28 public int command { get; set; }
- 29 //数据长度 4字节
- 30 public int dataLen { get; set; }
- 31
- 32
- 33
- 34 override public void SerialToBytes()
- 35 {
- 36 //有字符串数据 但是待发送字节是空
- 37 if ((string.IsNullOrEmpty(jsonStr) == false ))//&& (dataBytes==null || dataBytes.Length==0)
- 38 {
- 39 dataBytes = Encoding.UTF8.GetBytes(jsonStr);
- 40 dataLen = dataBytes.Length;
- 41 dataBytesHeader = new byte[GlobalSymbol.Headerlen];
- 42
- 43 header = GlobalSymbol.HeaderSymbol;
- 44
- 45 byte[] btsHeader = BitConverter.GetBytes(header);
- 46 byte[] btsCommand = BitConverter.GetBytes(command);
- 47 byte[] btsLen = BitConverter.GetBytes(dataLen);
- 48
- 49 Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
- 50 Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
- 51 Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
- 52
- 53 }
- 54 else if((string.IsNullOrEmpty(jsonStr) == true )&& (dataBytes==null || dataBytes.Length==0)){
- 55 dataLen = 0;
- 56 dataBytes = new byte[0];
- 57
- 58 dataBytesHeader = new byte[GlobalSymbol.Headerlen];
- 59
- 60 header = GlobalSymbol.HeaderSymbol;
- 61
- 62 byte[] btsHeader = BitConverter.GetBytes(header);
- 63 byte[] btsCommand = BitConverter.GetBytes(command);
- 64 byte[] btsLen = BitConverter.GetBytes(dataLen);
- 65
- 66 Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
- 67 Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
- 68 Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
- 69 }
- 70 }
- 71
- 72 override public void SloveToTel()
- 73 {
- 74 //只解析字符串数据部分 ,header 和len 在接收之初就已解析
- 75 this.jsonStr = Encoding.UTF8.GetString(this.dataBytes);
- 76 }
- 77
- 78 }
复制代码 客户端代码
最后是客户端,有了上面的结构,客户端就不足为谈了,稍微了解socket的人都熟知套路的 基本跟EndpointClient一致- 1 public class MsgClientSchedule
- 2 {
- 3 Socket workingSocket;
- 4 //缓冲区最大数据长度
- 5 static int receiveBufferLenMax = 5000;
- 6 //单次receive数据(取决于tcp底层封包 但是不会超过缓冲区最大长度
- 7 byte[] onceReadDatas = new byte[receiveBufferLenMax];
- 8 //未解析到完整数据包时的残余数据保存区
- 9 List<byte> receiveBuffer = new List<byte>(receiveBufferLenMax);
- 10
- 11 string serverIP { get; set; }
- 12 int serverPort { get; set; }
- 13 public string localIPPort { get; set; }
- 14
- 15 //残余缓冲区数据长度
- 16 int receiveBufferLen = 0;
- 17
- 18 bool _isConnected { get; set; }
- 19
- 20 TelgramType telType;
- 21
- 22 //收一个包时触发
- 23 public Action<Telegram_Base> onReceive;
- 24 //与服务端断链时触发
- 25 public Action<string> onClientDel;
- 26
- 27
- 28 public bool isConnected { get { return _isConnected; } }
- 29 public MsgClientSchedule(string _serverIP,int _port)
- 30 {
- 31 serverIP = _serverIP;
- 32 serverPort = _port;
- 33 _isConnected = false;
- 34 }
- 35
- 36 public void Connect()
- 37 {
- 38 try
- 39 {
- 40 workingSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
- 41 IPEndPoint ipport = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
- 42 workingSocket.Connect(ipport);
- 43
- 44 localIPPort = workingSocket.LocalEndPoint.ToString();
- 45 _isConnected = true;
- 46 BeginReceive();
- 47 }
- 48 catch (Exception ex)
- 49 {
- 50 workingSocket = null;
- 51 _isConnected = false;
- 52
- 53 Console.WriteLine(ex.Message);
- 54 }
- 55
- 56 }
- 57
- 58
- 59
- 60
- 61 public void Send(Telegram_Base tel)
- 62 {
- 63 try
- 64 {
- 65 if(_isConnected==false)
- 66 {
- 67 Console.WriteLine("未连接到服务器");
- 68 return;
- 69 }
- 70 if (tel is Telegram_Schedule)
- 71 {
- 72 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
- 73 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
- 74 {
- 75 Console.WriteLine("尝试发送数据长度格式错误的报文");
- 76 return;
- 77 }
- 78 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
- 79 byte[] sendbytes = telBeSend.dataBytes;
- 80
- 81 //数据超过缓冲区长度 会导致无法拆包
- 82 if (sendbytes.Length <= receiveBufferLenMax)
- 83 {
- 84 workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
- 85 workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0, null, null
- 86
- 87 );
- 88 }
- 89 else
- 90 {
- 91 Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
- 92 throw new Exception("发送到调度客户端的数据超过缓冲区长度");
- 93 }
- 94
- 95
- 96 }
- 97 else if (tel is Telegram_SDBMsg)
- 98 {
- 99
- 100 }
- 101
- 102 }
- 103 catch (Exception ex)
- 104 {
- 105 Close();
- 106 Console.WriteLine(ex.Message);
- 107 //throw ex;
- 108 }
- 109 }
- 110
- 111 public void BeginReceive()
- 112 {
- 113 receiveBufferLen = 0;
- 114 workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
- 115 ReceiveCallback,
- 116
- 117 this);
- 118 }
- 119 private void ReceiveCallback(IAsyncResult iar)
- 120 {
- 121 try
- 122 {
- 123 MsgClientSchedule cli = (MsgClientSchedule)iar.AsyncState;
- 124 Socket socket = cli.workingSocket;
- 125 int reads = socket.EndReceive(iar);
- 126
- 127 if (reads > 0)
- 128 {
- 129
- 130 for (int i = 0; i < reads; i++)
- 131 {
- 132 receiveBuffer.Add(onceReadDatas[i]);
- 133 }
- 134
- 135 //具体填充了多少看返回值 此时 数据已经在buffer中了
- 136
- 137 receiveBufferLen += reads;
- 138
- 139 //加完了后解析 阻塞式处理 结束后开启新的接收
- 140 SloveTelData();
- 141
- 142
- 143
- 144 if (receiveBufferLenMax - receiveBufferLen > 0)
- 145 {
- 146 //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
- 147 socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
- 148 }
- 149 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
- 150 {
- 151 Close();
- 152
- 153 Console.WriteLine("服务端接口解析数据出现异常");
- 154 throw new Exception("服务端接口解析数据出现异常");
- 155 }
- 156 }
- 157 else//reads==0客户端已关闭
- 158 {
- 159 Close();
- 160 }
- 161 }
- 162 catch (Exception ex)
- 163 {
- 164 Close();
- 165
- 166 Console.WriteLine("ReceiveCallback Error");
- 167 Console.WriteLine(ex.Message);
- 168 }
- 169
- 170 }
- 171 private void SloveTelData()
- 172 {
- 173
- 174 //进行数据解析
- 175 SloveTelDataUtil slo = new SloveTelDataUtil();
- 176
- 177 if (telType == TelgramType.Schedule)
- 178 {
- 179 List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen,serverIP+":"+serverPort.ToString());
- 180 //buffer已经被处理一遍了 使用新的长度
- 181 receiveBufferLen = receiveBuffer.Count;
- 182 //解析出的每一个对象都触发 onreceive
- 183 for (int i = 0; i < dataEntitys.Count; i++)
- 184 {
- 185 if (onReceive != null)
- 186 onReceive(dataEntitys[i]);
- 187 }
- 188 }
- 189 else if (telType == TelgramType.SDBMsg)
- 190 {
- 191
- 192 }
- 193
- 194 }
- 195
- 196
- 197 public void Close()
- 198 {
- 199 try
- 200 {
- 201 _isConnected = false;
- 202
- 203 receiveBuffer.Clear();
- 204 receiveBufferLen = 0;
- 205 if (workingSocket != null && workingSocket.Connected)
- 206 workingSocket.Close();
- 207 }
- 208 catch (Exception ex)
- 209 {
- 210 Console.WriteLine(ex.Message);
- 211 }
- 212
- 213 }
- 214
- 215 }
复制代码 服务端调用
构建一个winform基本项目- 1 List<string> clients;
- 2 MsgServerSchedule server;
- 3 private void btn_start_Click(object sender, EventArgs e)
- 4 {
- 5 server = new MsgServerSchedule(int.Parse(tbx_port.Text));
- 6
- 7
- 8 server.Start();
- 9 if (server.isRunning == true)
- 10 {
- 11 btn_start.Enabled = false;
- 12
- 13 server.onReceive += new Action<Telegram_Base>(
- 14 (tel) =>
- 15 {
- 16 this.BeginInvoke(new Action(() =>
- 17 {
- 18 if (tel is Telegram_Schedule)
- 19 {
- 20 Telegram_Schedule ts = tel as Telegram_Schedule;
- 21 ts.SloveToTel();
- 22 Console.WriteLine(string.Format("commandType:{0}", ((ScheduleTelCommandType)ts.command).ToString()));
- 23
- 24 tbx_msgs.Text += ts.remoteIPPort + ">" + ts.jsonStr + "\r\n";
- 25
- 26 //数据回发测试
- 27 string fromip = ts.remoteIPPort;
- 28 string srcMsg = ts.jsonStr;
- 29 string fromServerMsg = ts.jsonStr + " -from server";
- 30 ts.jsonStr = fromServerMsg;
- 31
- 32
- 33 //如果消息里有指向信息 则转送到对应的客户端
- 34 if (clients != null)
- 35 {
- 36 string to = null;
- 37 for (int i = 0; i < clients.Count; i++)
- 38 {
- 39 if (srcMsg.Contains(clients[i]))
- 40 {
- 41 to = clients[i];
- 42 break;
- 43 }
- 44 }
- 45
- 46 if (to != null)
- 47 {
- 48 ts.remoteIPPort = to;
- 49 string toMsg;
- 50 //toMsg= srcMsg.Replace(to, "");
- 51 toMsg = srcMsg.Replace(to, fromip);
- 52 ts.jsonStr = toMsg;
- 53 ts.SerialToBytes();
- 54
- 55 server.SendToSomeOne(ts);
- 56 }
- 57 else
- 58 {
- 59 ts.SerialToBytes();
- 60 server.SendToSomeOne(ts);
- 61 }
- 62 }
- 63 }
- 64 }));
- 65
- 66 }
- 67 );
- 68
- 69 server.onClientAddDel += new Action<List<string>>((onlines) =>
- 70 {
- 71 this.BeginInvoke(
- 72 new Action(() =>
- 73 {
- 74 clients = onlines;
- 75 listbox_clients.Items.Clear();
- 76
- 77 for (int i = 0; i < onlines.Count; i++)
- 78 {
- 79 listbox_clients.Items.Add(onlines[i]);
- 80 }
- 81 }));
- 82 });
- 83 }
- 84 }
- 85 private void btn_sendAll_Click(object sender, EventArgs e)
- 86 {
- 87 Telegram_Schedule tel = new Telegram_Schedule();
- 88 tel.header = GlobalSymbol.HeaderSymbol;
- 89 tel.command = (int)ScheduleTelCommandType.StartC2S;
- 90 tel.jsonStr = tbx_sendAll.Text;
- 91 tel.SerialToBytes();
- 92
- 93 server.SendToAll(tel);
- 94 }
复制代码 客户端调用
- 1 MsgClientSchedule client;
- 2
- 3 private void btn_start_Click(object sender, EventArgs e)
- 4 {
- 5 client = new MsgClientSchedule(tbx_ip.Text, int.Parse(tbx_port.Text));
- 6
- 7 client.Connect();
- 8
- 9 if (client.isConnected == true)
- 10 {
- 11 btn_start.Enabled = false;
- 12
- 13 label1.Text = client.localIPPort;
- 14
- 15 client.onReceive = new Action<Telegram_Base>((tel) =>
- 16 {
- 17 this.BeginInvoke(
- 18 new Action(() =>
- 19 {
- 20 tel.SloveToTel();
- 21 tbx_rec.Text += tel.jsonStr + "\r\n";
- 22
- 23 }));
- 24 });
- 25 }
- 26
- 27 }
- 28
- 29
- 30
- 31 private void btn_send_Click(object sender, EventArgs e)
- 32 {
- 33
- 34 if (client == null || client.isConnected == false)
- 35 return;
- 36
- 37 //for (int i = 0; i < 2; i++)
- 38 //{
- 39 Telegram_Schedule tel = new Telegram_Schedule();
- 40 tel.command = (int)ScheduleTelCommandType.MsgC2S;
- 41
- 42 tel.jsonStr = tbx_remoteip.Text+">"+ tbx_msgSend.Text;
- 43 tel.SerialToBytes();//发出前要先序列化
- 44
- 45 client.Send(tel);
- 46 //}
- 47
- 48 }
复制代码 实现效果
可以多客户端连接互相自由发送消息,服务端可以编写转发规则代码,那些什么棋牌啊 互动白板 以及其他类似的应用就可以基于此之上发挥想象了
来源:https://www.cnblogs.com/assassinx/archive/2023/02/25/17154167.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|