翼度科技»论坛 编程开发 .net 查看内容

PasteSpider的集群组件PasteCluster(让你的项目快速支持集群模式)的思路及

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
PasteSpider是什么?
  1. 一款使用.net编写的开源的Linux容器部署助手,支持一键发布,平滑升级,自动伸缩,
  2. Key-Value配置,项目网关,环境隔离,运行报表,差量升级,私有仓库,集群部署,版本管理等!
  3. 30分钟上手,让开发也可以很容易的学会在linux上部署你得项目!
  4. [从需求角度介绍PasteSpider(K8S平替部署工具适合于任何开发语言)]
  5. (https://blog.csdn.net/Apeart/article/details/138819197?spm=1001.2014.3001.5501)
复制代码
PasteCluster又是什么?
  1. 一套使用.net编写的中间件,如果你的API项目是.NET的,那么通过引入这个组件可以让你的项目快速支持集群模式!
  2. (部署是部署工具和写法的问题,模式是模式哈,有主从的那个是模式)
复制代码

接入现有项目的案例


如上载入必要的配置,关于ClusterConfig的内容有啥,需要查看源码PasteCloveConfig.cs这个文件,大概有十多个配置项,也可以使用默认值哈!
这个配置在appsettings.json是这样的

PasteClusterHandlr这个逻辑他会产生一个Channel的异步队列,业务代码上处理这个业务队列即可,比方说我们启动一个HostedService来处理这个队列

上面核心代码其实只有2行
  1.             //如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中
  2.             // -e ClusterConfig:CurrentHost="http://192.168.1.100"
  3.             //如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"
  4.             //_cluster_handler.Register("http://192.168.1.100",0,0);
  5.             //启动集群中的当前节点(在这之前必须要确认当前节点的host是多少)
  6.             _cluster_handler.StartCluster();
复制代码
  1.             //读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息
  2.             ReadClusterChannel();
复制代码
其他部分概括下就是写入集群节点列表,写入当前节点的HOST信息的!
你只要关注你的业务代码如下:

就是这个default的分支,严格来说应该是msg_type=0或者大于10的(1-10组件用于通知特定消息给业务,至于业务用不用要基于自己的业务需求,比如我上面这个案例就是节点掉线后把他从集群节点列表种删除(记录在redis种))
在最后将会贴出源码
集群需求过程

如果说K8S是对运维人员的部署工具,那么PasteSpider应该就是针对开发人员的部署助手了!
K8S不是有那个主从集群模式么,一开始那会思考也整一个集群模式,这样一个PasteSpider宕机了,还有其他的来承担任务!
去中心化集群

一开始的思路是去中心化的集群模式,说白点就是数据库都是一个节点一个的,当然这里的去中心化和市面上的那个链不是一回事哈,至少没有链衔接和链校验的问题!
假设一个节点就是一套完整的PasteSpider,包括数据库,对应的api和web管理端等
如何保证集群安全性?

集群内的所有节点通讯使用一个统一的密钥,这样可以防止其他集群乱入!
数据防重和数据唯一?

首先一个,数据库的表我分几种:
1.基础表,比如会员信息表,这种基础表肯定每一个节点都要同步的
2.统计表,这个明显的代表就是报表,比如日报表,这样要报表的时候不需要从日志表中读取统计
3.关系表,一般的是基础表和基础表的关系的描述的表,也是要同步的
4.日志表,记录一些操作的,视情况而定,不一定要同步完全
每个表添加3个字段,节点,创建时间戳,更新时间戳,这样在同步数据的时候可以基于这个时间戳获取数据差,然后根据ID规则同步数据
由于对管理端来说,只要一个PasteSpider在工作,也就是Nginx中配置主备用模式,这样也就是说主要的数据都是通过一个节点新建的,这样可以排除数据重复的问题
比如说,整个系统的项目的代码需要唯一!
辅助组件如何做到集群

比如项目使用到了nginx,使用到了redis等,如果要完整的集群,甚至是多台服务器,其实严格的集群我个人认为是不可能的!
如果可能,那么每年就不会有那么多这个服务宕机多久,那个宕机多久了!
市面上没有100%的灾备方案,因为在追求100%的路上,你会发觉一直在套娃... .. .
普通的集群的现状


稍微查询下目前比较流程的一些中间件,用到的集群模式大概有以下要点:
1.集群选举过程中,不对外提供服务,可以理解成集群还没有协商一致好!那么引申的问题就是集群选举越快越好!
2.几乎的集群选举采用的是单数,过半模式!看过各种解释,说白点就是要少数服从多数,我个人觉得这个问题很扯的!
集群的数量发生变动了,这个是不可控的吧!
居然不可控,你咋保证宕机的数量一定是单数?那肯定单数复数都有可能,而且概率还很大,那这半数了个寂寞!
3.很多集群模式引入了多角色,比如蜂王,工蜂,监控蜂... .. . 啥意思?难不成只有工蜂和封王会宕机???如果你说不同角色宕机的概率不一样,那我还是信的,毕竟负载不一样!
如果都会宕机,那角色越多是不是意味着集群恢复的逻辑越复杂?角色切换来切换去好玩?
4.还有一种半通的集群情况,比如A,B,C,D,E,其中A只和B,C能通,其他的都能互通!
这什么神奇组合,这种的我思考来思考去,还不如你搞2个集群,然后弄一个中转,或者是业务需求变更下,因为为了实现这个半通,付出的代价太大了!
PasteCluster的方式

1.采用N个节点模式,N大于等于1即可,1的时候表示自己是节点,自己担任任何角色(其实也就2个角色,master和cluster角色)
这里2个角色的区别主要是在业务上,比方说你集群中的某些任务,需要有一个人专门管理,那么这个任务就交给master来处理
2.不采用选举模式,采用竞选模式,当发生2个竞选冲突的时候,基于vote_time时间戳和id来确定谁是master!
假设有A,B,C,D,E节点,如果当前A为master!
停止A,则会发生


B,C,D,E在不同时间点会发觉A不在了,这里的发现可能并发也可能是按照一定的顺序,取决于他们的轮询线
假设B先发现A出问题了,立马给自己的vote_time赋值时间戳ms,然后问(除了自己外的其他节点)你有master么?
假设被问的是C,他这个时候还没有发现A挂了,
他立马健康检查A,
如果通了,则返回A的信息给B,告诉他当前的master是A


如果不通,他则给自己的vote_time赋值,然后告诉B,C就是master!B收到返回后,就把C设为master
C这个时候会进入选举阶段,会把自己的master(C)信息群发给其他节点
其他节点收到后,会和自己的master做健康检查,如果接通的话,再做vote_time对比,结合节点Id选举出最小的返回给C,
C这个时候收到的结果,如果返回的master(E)(这个是对比vote_time的结果)不是C他这个节点,则C放弃自己作为节点,接受返回的master(E)为master!自己做cluster的角色
上述居然有返回E,那么表示E也启动了选举,只是在和C的对比中E胜出了,C由于在遍历节点的时候失败了,所以中途退出了,也就是E在遍历的过程中完成了一整个遍历,没有被中断!


E在完成遍历后,群发一条消息给所有节点,告知我是新的Master(E)
以上整个选举过程就算完成,从时间上来说理论上是一个集群遍历的是时间,不需要其他集群模式的反复选举!
一般的是谁先发现,然后下一个节点选举胜出!主要是这个问的环节,如果没有问,直接进入竞,复杂度还会提升!
集群问答

1.集群节点保活问题

所有的集群节点都有心跳模式,因为你没办法知道对方下线没有,所以就有一个轮询,一定时间问对方在线否,PasteCluster采用双心跳,心跳复用模式!
双心跳是指master会定期向其他节点轮询是否在线
节点也会问轮询问master是否在线
复用模式在于,每次检查,只有他们之间的交互时间大于设定的时间后才会发送这个心跳检查,比如设定10秒钟!如果近期10秒钟这2个节点有通讯,则这一次心跳不在问询的任务中!
2.如何防止小集群出现(脑裂?)

之前第一次写的时候,后面测试发现会出现有多个master的情况,其实把整个选举过程拆开,每一个步骤延长时间,就很好理解,出现多个master是可能的!
在节点的交互过程中,比如A节点发送信息给B节点,A的请求会附带至少2个信息,A的master是谁,A的所在集群节点的总数,发送给B后,B会对这2个信息进行校验,如何和A的不一致,则B会抛弃自己的master信息,转而进入“问”阶段,再根据这个问的结果,加入集群!
3.啥时候发生加入集群这个动作

加入集群有2种情况,
1.是刚刚启动的时候,肯定得找个集群去加入,虽然有可能整个系统就你一个节点,过程嘛总要走的,说不定有其他节点(节点能通)呢!
2.在数据交互过程中,发现信息不对称,抛弃自己得信息加入到现有得一个主节点
4.啥时候移除节点

1.节点离线的时候主动触发,主动向master告知我这个节点离线了,master重新整理节点信息后,群发给所有其他在线的节点,从而更新整个集群!
2.在节点交互过程中,通讯错误次数超过设定的次数,这个交互包括数据通讯或者定时的健康检查,最终在master健康检查的时候移除这个节点,然后广播
5.给集群分配节点列表信息

1.通过配置文件的方式在启动的时候导入
2.通过AddNodeToList在启动后添加,比如你通过读取Redis获得节点列表,然后导入
6.找到当前节点的IP等信息

节点之间是通过HOSTIP等通讯的,所以最基本的就是需要知道当前节点是哪个IP,有2种模式
1.已知节点列表有10个IP,现在是不知道哪个节点是哪个,把节点列表写入到当前节点信息后,然后调用FindMe(),系统会去遍历列表,从而找到哪个节点信息是当前节点,你可以使用
/api/cluster/status查看自己当前节点是多少来确定是否查找完成!
2.在服务启动的时候给导入IP信息,比如如果你使用PasteSpider部署的项目,而且项目配置了项目网关,则可以在对应服务的配置中添加如下代码即可:

如果是非80端口要自己修改下!看配置其实就是在启动的额时候修改配置项ClusterConfig:CurrentHost的值!
7.手动加入一个现有集群

加入一个集群有2个步骤,一个是给当前自己打标签,也就是为自己设定host等信息,然后是ScanMaster();也就是当前节点知道自己是谁后,然后去查找当前的集群信息,然后加入这个集群!
8.主动退出一个集群


其实你也可以主动调用Leave函数实现离开集群!
PasteCluste组件的源码

你可以从Gitee拉取最新源码 PasteCluster组件源码和使用案例
拉取后打开项目,只需要关注如下代码

其他的子项目是因为这个使用案例是使用PasteTemplate生成的,懒得精简了,看下案例应该就知道如何使用到自己项目中了!
源码解说

基于上图,主要文件是2个
1.PasteClusterController.cs

看文件命名就知道,他是一个Controller,用于节点对外通讯的,每一个接口都有添加header校验,用于校验请求是否属于这个集群,在最后代码处:

要安全点的话,你可以加入那个time_temptoken,加密校验,类似OAUTH那个协议模式,过期了丢弃这个token
上图的fromnode用于修改最后交互时间,减少心跳中不必要的检查,
masternode则用于校验是否脑裂?也就是是否有多个master的情况,发现后及时处理!

在配置路由转发的时候(nginx的配置),需要为这个路径做代理
源码如下:
  1. using Microsoft.AspNetCore.Mvc;
  2. using Microsoft.Extensions.Options;
  3. using Newtonsoft.Json.Linq;
  4. namespace PasteCluster
  5. {
  6.     /// <summary>
  7.     ///
  8.     /// </summary>
  9.     [ApiController]
  10.     [Route("/api/cluster/[action]")]
  11.     public class PasteClusterController : Controller
  12.     {
  13.         private PasteClusterHandler _handler;
  14.         private PasteSloveConfig _config;
  15.         /// <summary>
  16.         ///
  17.         /// </summary>
  18.         /// <param name="handler"></param>
  19.         /// <param name="config"></param>
  20.         public PasteClusterController(PasteClusterHandler handler, IOptions<PasteSloveConfig> config)
  21.         {
  22.             _config = config.Value;
  23.             _handler = handler;
  24.         }
  25.         /// <summary>
  26.         /// 健康检查
  27.         /// </summary>
  28.         /// <returns></returns>
  29.         [HttpGet]
  30.         public PasteApiResponse health()
  31.         {
  32.             CheckHeader();
  33.             return _handler.Health();
  34.         }
  35.         /// <summary>
  36.         /// 读取当前状态
  37.         /// </summary>
  38.         /// <returns></returns>
  39.         [HttpGet]
  40.         public string status()
  41.         {
  42.             return _handler.Status();
  43.         }
  44.         /// <summary>
  45.         /// 问询master信息
  46.         /// </summary>
  47.         /// <returns></returns>
  48.         [HttpGet]
  49.         public PasteApiResponse ask()
  50.         {
  51.             CheckHeader();
  52.             return _handler.Ask();
  53.         }
  54.         /// <summary>
  55.         /// 查找自己
  56.         /// </summary>
  57.         /// <param name="input"></param>
  58.         /// <returns></returns>
  59.         [HttpPost]
  60.         public async Task<PasteApiResponse> find(PasteNodeModel input)
  61.         {
  62.             CheckHeader();
  63.             return await _handler.Find(input);
  64.         }
  65.         /// <summary>
  66.         /// 有节点加入
  67.         /// </summary>
  68.         /// <param name="input"></param>
  69.         /// <returns></returns>
  70.         [HttpPost]
  71.         public PasteApiResponse join(PasteNodeModel input)
  72.         {
  73.             CheckHeader();
  74.             _handler.Join(input);
  75.             return _handler.State();
  76.         }
  77.         /// <summary>
  78.         /// 转发消息
  79.         /// </summary>
  80.         /// <param name="input"></param>
  81.         /// <returns></returns>
  82.         [HttpPost]
  83.         public async Task<PasteApiResponse> msg(PasteSloveMessage input)
  84.         {
  85.             CheckHeader();
  86.             input.from_api = true;
  87.             return await _handler.Msg(input);
  88.         }
  89.         /// <summary>
  90.         /// 往集群中发送消息
  91.         /// </summary>
  92.         /// <param name="input"></param>
  93.         /// <returns></returns>
  94.         [HttpPost]
  95.         public PasteApiResponse push(PasteSloveMessage input)
  96.         {
  97.             CheckHeader();
  98.             _handler.PushMsgToNode(input.body);
  99.             return _handler.State();
  100.         }
  101.         /// <summary>
  102.         /// 这个一般是PasteSpider推送过来的信息
  103.         /// </summary>
  104.         /// <param name="input"></param>
  105.         /// <returns></returns>
  106.         [HttpPost]
  107.         public PasteApiResponse run()
  108.         {
  109.             JObject info = null;
  110.             if (base.Request.Body != null)
  111.             {
  112.                 if (base.Request.Body != null)
  113.                 {
  114.                     using var stream = new StreamReader(base.Request.Body);
  115.                     var bodystr = stream.ReadToEndAsync().GetAwaiter().GetResult();
  116.                     //Console.WriteLine(bodystr);
  117.                     info = Newtonsoft.Json.JsonConvert.DeserializeObject<JObject>(bodystr);
  118.                 }
  119.             }
  120.             if (info != null)
  121.             {
  122.                 var _host = string.Empty;
  123.                 var _port = 80;
  124.                 JObject input = info;
  125.                 //if(info is JObject)
  126.                 //{
  127.                 //    input = input as JObject;
  128.                 //}
  129.                 //else
  130.                 //{
  131.                 //    Console.WriteLine("input is not JContainer");
  132.                 //    return _handler.State();
  133.                 //}
  134.                 if (input.ContainsKey("proAddress"))
  135.                 {
  136.                     _host = input["proAddress"].ToString();
  137.                 }
  138.                 if (input.ContainsKey("extendService"))
  139.                 {
  140.                     var _ser = input["extendService"];
  141.                     if (_ser.Type == JTokenType.Object)
  142.                     {
  143.                         var _serobj = _ser as JObject;
  144.                         if (_serobj.ContainsKey("listenPorts"))
  145.                         {
  146.                             var ports = _serobj["listenPorts"].ToString();
  147.                             if (!String.IsNullOrEmpty(ports))
  148.                             {
  149.                                 int.TryParse(ports.Split(',')[0], out var _pp);
  150.                                 if (_pp > 0)
  151.                                 {
  152.                                     _port = _pp;
  153.                                 }
  154.                             }
  155.                         }
  156.                     }
  157.                 }
  158.                 if (!String.IsNullOrEmpty(_host))
  159.                 {
  160.                     if (_port != 80)
  161.                     {
  162.                         _handler.Register($"http://{_host}:{_port}", 0, 0);
  163.                     }
  164.                     else
  165.                     {
  166.                         _handler.Register($"http://{_host}", 0, 0);
  167.                     }
  168.                 }
  169.                 else
  170.                 {
  171.                     Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));
  172.                 }
  173.             }
  174.             return _handler.State();
  175.         }
  176.         /// <summary>
  177.         /// 手动注册自己
  178.         /// </summary>
  179.         /// <param name="input"></param>
  180.         /// <returns></returns>
  181.         [HttpGet]
  182.         public string joinin([FromQuery] PasteNodeModel input, string token)
  183.         {
  184.             if (token == _config.SloveToken)
  185.             {
  186.                 _handler.Register(input.host, input.id, input.group);
  187.             }
  188.             else
  189.             {
  190.                 return "token参数错误!";
  191.             }
  192.             return "手动加入!";
  193.         }
  194.         /// <summary>
  195.         /// 加入某一个节点,当前需要为master
  196.         /// </summary>
  197.         /// <param name="input"></param>
  198.         /// <returns></returns>
  199.         [HttpGet]
  200.         public async Task<string> addone([FromQuery] PasteNodeModel input, string token)
  201.         {
  202.             if (token == _config.SloveToken)
  203.             {
  204.                 if (_handler.CurrentIsMaster || _handler.IsSingle)
  205.                 {
  206.                     var node = new PasteNodeModel
  207.                     {
  208.                         host = input.host,
  209.                         id = input.id,
  210.                         group = input.group,
  211.                     };
  212.                     await _handler.AddNodeToList(node);
  213.                 }
  214.                 else
  215.                 {
  216.                     return "当前节点不是Master,无法执行这个命令";
  217.                 }
  218.             }
  219.             else
  220.             {
  221.                 return "token参数错误!";
  222.             }
  223.             return "手动加入!";
  224.         }
  225.         /// <summary>
  226.         /// 节点集合
  227.         /// </summary>
  228.         /// <returns></returns>
  229.         [HttpGet]
  230.         public PasteNodeModel[] nodes()
  231.         {
  232.             return _handler.Nodes();
  233.         }
  234.         /// <summary>
  235.         /// 执行选举
  236.         /// </summary>
  237.         /// <param name="input"></param>
  238.         /// <returns></returns>
  239.         [HttpPost]
  240.         public async Task<PasteApiResponse> vote(PasteNodeModel input)
  241.         {
  242.             CheckHeader();
  243.             return await _handler.Vote(input);
  244.         }
  245.         /// <summary>
  246.         /// 被通知,有成为master
  247.         /// </summary>
  248.         /// <param name="input"></param>
  249.         /// <returns></returns>
  250.         [HttpPost]
  251.         public async Task<PasteApiResponse> master(PasteMasterResponse input)
  252.         {
  253.             CheckHeader();
  254.             //Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));
  255.             return await _handler.Master(input);
  256.         }
  257.         /// <summary>
  258.         /// 有节点离开
  259.         /// </summary>
  260.         /// <param name="input"></param>
  261.         /// <returns></returns>
  262.         [HttpPost]
  263.         public async Task<PasteApiResponse> leave(PasteNodeModel input)
  264.         {
  265.             CheckHeader();
  266.             return await _handler.Leave(input);
  267.         }
  268.         /// <summary>
  269.         /// 移除某节点
  270.         /// </summary>
  271.         /// <param name="input"></param>
  272.         /// <returns></returns>
  273.         [HttpPost]
  274.         public async Task<PasteApiResponse> remove(PasteNodeModel input)
  275.         {
  276.             CheckHeader();
  277.             return await _handler.Remove(input);
  278.         }
  279.         /// <summary>
  280.         /// 检查信息是否合法
  281.         /// </summary>
  282.         /// <exception cref="Exception"></exception>
  283.         private void CheckHeader()
  284.         {
  285.             if (base.Request.Headers.ContainsKey("slovetoken"))
  286.             {
  287.                 if (base.Request.Headers["slovetoken"] != _config.SloveToken)
  288.                 {
  289.                     throw new Exception("slovetoken错误,请重新输入!");
  290.                 }
  291.                 else
  292.                 {
  293.                     if (base.Request.Headers.ContainsKey("fromnode"))
  294.                     {
  295.                         _handler.TargetMasterLast(base.Request.Headers["fromnode"].ToString());
  296.                     }
  297.                     if (base.Request.Headers.ContainsKey("masternode"))
  298.                     {
  299.                         _handler.ConfirmMaster(base.Request.Headers["masternode"].ToString());
  300.                     }
  301.                 }
  302.             }
  303.             else
  304.             {
  305.                 throw new Exception("slovetoken不能为空,请重新输入!");
  306.             }
  307.         }
  308.     }
  309. }
复制代码
2.PasteClusterHandler.cs

这个文件就是集群选举,维护等的主要逻辑,里面使用了2个Channel异步队列,一个是这个集群组件内部使用的,有写入数据和读取数据,另外一个就是给业务用的了,在上面的案例中的HostedService中有体现!
整个选举过程和修复等,可以查看这个文件的源码,有疑问的麻烦在评论中回复!!!
源码内容:
  1. using System.Net;
  2. using System.Text;
  3. using System.Threading.Channels;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Options;
  6. namespace PasteCluster
  7. {
  8.     /// <summary>
  9.     /// 注意用IOC单例注入系统,多处调用
  10.     /// </summary>
  11.     public class PasteClusterHandler : IDisposable
  12.     {
  13.         /// <summary>
  14.         ///
  15.         /// </summary>
  16.         private readonly IHttpClientFactory _httpClientFactory;
  17.         /// <summary>
  18.         /// 用于内部消息中转等 节点中的任务
  19.         /// </summary>
  20.         private Channel<PasteEventModel> _channel_slove;
  21.         /// <summary>
  22.         /// 业务队列 表示队列中的消息要业务逻辑处理,当前可能为slove也可能为master
  23.         /// 业务监听队列
  24.         /// 表示业务要处理的消息
  25.         /// 当前节点可能是slove节点
  26.         /// 也可能是master节点
  27.         /// </summary>
  28.         public Channel<PasteSloveMessage> ChannelCluster;
  29.         /// <summary>
  30.         /// 当前节点信息
  31.         /// </summary>
  32.         private PasteNodeModel _current_node = null;
  33.         /// <summary>
  34.         /// 当前的Master是哪个
  35.         /// </summary>
  36.         private PasteNodeModel _current_master = null;
  37.         /// <summary>
  38.         ///
  39.         /// </summary>
  40.         private ILogger<PasteClusterHandler> _logger = null;
  41.         /// <summary>
  42.         /// 当前是否是主节点
  43.         /// </summary>
  44.         public bool CurrentIsMaster
  45.         {
  46.             get
  47.             {
  48.                 if (_current_master != null && _current_node != null)
  49.                 {
  50.                     return _current_master.host == _current_node.host;
  51.                 }
  52.                 return false;
  53.             }
  54.         }
  55.         /// <summary>
  56.         /// 是否是单例模式
  57.         /// 当前节点为null
  58.         /// 或者没有节点列表
  59.         /// </summary>
  60.         public bool IsSingle
  61.         {
  62.             get
  63.             {
  64.                 if (_current_master == null || _current_node == null)
  65.                 {
  66.                     return true;
  67.                 }
  68.                 if (node_list == null || node_list.Count <= 1)
  69.                 {
  70.                     return true;
  71.                 }
  72.                 return false;
  73.             }
  74.         }
  75.         /// <summary>
  76.         /// 当前节点的名称 如果为空表示没有或者本身配置就是空
  77.         /// </summary>
  78.         public string CurrentName
  79.         {
  80.             get
  81.             {
  82.                 if (_current_node != null)
  83.                 {
  84.                     return _current_node.name;
  85.                 }
  86.                 return "";
  87.             }
  88.         }
  89.         /// <summary>
  90.         /// 获取或设置当前集群的密钥 如果要修改这个需要在所有的调用之前配置
  91.         /// </summary>
  92.         public string SloveToken
  93.         {
  94.             get
  95.             {
  96.                 return _config.SloveToken;
  97.             }
  98.             set
  99.             {
  100.                 _config.SloveToken = value;
  101.             }
  102.         }
  103.         /// <summary>
  104.         /// 当前节点的名称 如果为空表示没有或者本身配置就是空
  105.         /// </summary>
  106.         public string CurrentHost
  107.         {
  108.             get
  109.             {
  110.                 if (_current_node != null)
  111.                 {
  112.                     return _current_node.host;
  113.                 }
  114.                 return "";
  115.             }
  116.         }
  117.         /// <summary>
  118.         /// 当前节点
  119.         /// </summary>
  120.         public PasteNodeModel CurrentNode
  121.         {
  122.             get
  123.             {
  124.                 return _current_node;
  125.             }
  126.         }
  127.         /// <summary>
  128.         /// 当前节点的名称 如果为空表示没有或者本身配置就是空
  129.         /// </summary>
  130.         public int CurrentId
  131.         {
  132.             get
  133.             {
  134.                 if (_current_node != null)
  135.                 {
  136.                     return _current_node.id;
  137.                 }
  138.                 return 0;
  139.             }
  140.         }
  141.         /// <summary>
  142.         /// 当前配置信息
  143.         /// </summary>
  144.         private PasteSloveConfig _config;
  145.         /// <summary>
  146.         ///
  147.         /// </summary>
  148.         private List<PasteNodeModel> node_list = null;
  149.         /// <summary>
  150.         /// 是否在选举中
  151.         /// </summary>
  152.         private bool voting = false;
  153.         /// <summary>
  154.         ///
  155.         /// </summary>
  156.         private System.Timers.Timer _timer;
  157.         /// <summary>
  158.         /// 当前节点的代码,初始值为空,可以被赋值,被赋值后不能再次被赋值
  159.         /// </summary>
  160.         private string node_code = String.Empty;
  161.         /// <summary>
  162.         /// 暂存发往节点的消息,这个是指消息,业务上的消息
  163.         /// </summary>
  164.         private List<PasteEventModel> ziplist = null;
  165.         /// <summary>
  166.         ///
  167.         /// </summary>
  168.         private System.Threading.SemaphoreSlim _semaphoreSlim;
  169.         /// <summary>
  170.         ///
  171.         /// </summary>
  172.         private long msg_count = 0;
  173.         /// <summary>
  174.         ///
  175.         /// </summary>
  176.         public PasteClusterHandler(ILogger<PasteClusterHandler> logger, IOptions<PasteSloveConfig> config, IHttpClientFactory httpClientFactory)
  177.         {
  178.             _logger = logger;
  179.             _config = config.Value;
  180.             _httpClientFactory = httpClientFactory;
  181.             _semaphoreSlim = new SemaphoreSlim(1);
  182.             ziplist = new List<PasteEventModel>();
  183.             _channel_slove = Channel.CreateBounded<PasteEventModel>(_config.SloveChannelMsgCapacity);
  184.             ChannelCluster = Channel.CreateBounded<PasteSloveMessage>(_config.MasterChannelMsgCapacity);
  185.             _timer = new System.Timers.Timer(1000);
  186.             _timer.Elapsed += _timer_Elapsed;
  187.             _timer.AutoReset = true;
  188.             _timer.Start();
  189.             ActionSloveEvent();
  190.         }
  191.         /// <summary>
  192.         /// 当前计数的次数
  193.         /// </summary>
  194.         private int _tick_index = 0;
  195.         /// <summary>
  196.         /// 给当前节点赋值code
  197.         /// </summary>
  198.         /// <param name="code"></param>
  199.         /// <returns>是否赋值成功</returns>
  200.         public bool SetNodeCode(string code)
  201.         {
  202.             if (String.IsNullOrEmpty(node_code))
  203.             {
  204.                 node_code = code;
  205.             }
  206.             else
  207.             {
  208.                 return false;
  209.             }
  210.             return true;
  211.         }
  212.         /// <summary>
  213.         /// 本地用于读取节点列表
  214.         /// </summary>
  215.         /// <returns></returns>
  216.         public PasteNodeModel[] Nodes()
  217.         {
  218.             if (node_list != null)
  219.             {
  220.                 return node_list.ToArray();
  221.             }
  222.             return null;
  223.         }
  224.         /// <summary>
  225.         /// 启动集群
  226.         /// 读取ClusterHost作为集群列表
  227.         /// 读取CurrentHost作为当前节点得Host
  228.         /// 如果有NodeList和NodeCode(CurrentCode)触发查找自己
  229.         /// 否则触发查找Master
  230.         /// </summary>
  231.         public async void StartCluster()
  232.         {
  233.             if (!string.IsNullOrEmpty(_config.CurrentHost))
  234.             {
  235.                 Register(_config.CurrentHost, 0, 0);
  236.             }
  237.             if (!String.IsNullOrEmpty(_config.ClusterHost))
  238.             {
  239.                 var _list = new List<PasteNodeModel>();
  240.                 var strs = _config.ClusterHost.Split(";");
  241.                 foreach (var str in strs)
  242.                 {
  243.                     if (!string.IsNullOrEmpty(str))
  244.                     {
  245.                         var its = str.Split(',');
  246.                         var _one = new PasteNodeModel { };
  247.                         _one.host = its[0];
  248.                         if (its.Length > 1)
  249.                         {
  250.                             int.TryParse(its[1], out var _id);
  251.                             _one.id = _id;
  252.                         }
  253.                         if (its.Length > 2)
  254.                         {
  255.                             int.TryParse(its[2], out var _group);
  256.                             _one.group = _group;
  257.                         }
  258.                         if (!_list.Contains(_one))
  259.                         {
  260.                             _list.Add(_one);
  261.                         }
  262.                     }
  263.                 }
  264.                 if (_list.Count > 0)
  265.                 {
  266.                     foreach (var _one in _list)
  267.                     {
  268.                         await AddNodeToList(_one);
  269.                     }
  270.                 }
  271.             }
  272.             if (!String.IsNullOrEmpty(_config.CurrentCode))
  273.             {
  274.                 SetNodeCode(_config.CurrentCode);
  275.             }
  276.             if (!String.IsNullOrEmpty(node_code))
  277.             {
  278.                 if (node_list != null && node_list.Count > 0)
  279.                 {
  280.                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
  281.                 }
  282.             }
  283.             else
  284.             {
  285.                 Console.WriteLine("begin to join cluster:" + node_list.Select(x => x.host).ToString());
  286.                 await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
  287.             }
  288.         }
  289.         /// <summary>
  290.         /// 需要确保先调用SetNodeCode 表示给自己打一个标记
  291.         /// 然后会启动一个任务去遍历集合,查找这个NodeCode,找到后就知道自己的HOST信息了
  292.         /// </summary>
  293.         public async void FindMe()
  294.         {
  295.             if (!String.IsNullOrEmpty(node_code))
  296.             {
  297.                 if (node_list != null && node_list.Count > 0)
  298.                 {
  299.                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
  300.                 }
  301.             }
  302.         }
  303.         /// <summary>
  304.         /// 开始查找自己 需要有节点集合(通过AddNodeToList赋值) 需要有当前节点得node_code(通过SetNodeCode赋值)
  305.         /// </summary>
  306.         /// <returns></returns>
  307.         public async Task<bool> StartFindMe()
  308.         {
  309.             if (!String.IsNullOrEmpty(node_code))
  310.             {
  311.                 if (node_list != null && node_list.Count > 0)
  312.                 {
  313.                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });
  314.                     return true;
  315.                 }
  316.             }
  317.             return false;
  318.         }
  319.         /// <summary>
  320.         /// 如果消息来自Master则更新我和Master的最后交互时间
  321.         /// 这个功能用于减少不必要的心跳资源浪费
  322.         /// </summary>
  323.         /// <param name="host"></param>
  324.         public void TargetMasterLast(string host)
  325.         {
  326.             if (_current_master != null)
  327.             {
  328.                 if (_current_master.host == host)
  329.                 {
  330.                     _current_master.last_time = DateTime.Now.ToUnixTimeSeconds();
  331.                     //Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} Node:{CurrentHost} Update Master Last Time!");
  332.                 }
  333.             }
  334.         }
  335.         /// <summary>
  336.         /// 确认对方的Master和我的是否一致!
  337.         /// 如果不一致,则我进去查找Master阶段
  338.         /// </summary>
  339.         /// <param name="host"></param>
  340.         public void ConfirmMaster(string host)
  341.         {
  342.             if (_current_master != null)
  343.             {
  344.                 if (_current_master.host != host)
  345.                 {
  346.                     _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
  347.                 }
  348.             }
  349.         }
  350.         /// <summary>
  351.         /// 拥有集群的地址列表,去找到自己
  352.         /// 这个是给内部调用的,请勿调用
  353.         /// </summary>
  354.         /// <param name="input"></param>
  355.         public async Task<PasteApiResponse> Find(PasteNodeModel input)
  356.         {
  357.             if (!String.IsNullOrEmpty(input.node_code))
  358.             {
  359.                 if (node_code == input.node_code)
  360.                 {
  361.                     //找到自己了
  362.                     _current_node = new PasteNodeModel
  363.                     {
  364.                         node_code = node_code,
  365.                         group = input.group,
  366.                         host = input.host,
  367.                         id = input.id,
  368.                     };
  369.                     await AddNodeToList(_current_node);
  370.                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
  371.                 }
  372.             }
  373.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  374.         }
  375.         /// <summary>
  376.         /// 定时检查
  377.         /// </summary>
  378.         /// <param name="sender"></param>
  379.         /// <param name="e"></param>
  380.         private async void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
  381.         {
  382.             try
  383.             {
  384.                 if (_current_node != null)
  385.                 {
  386.                     _tick_index++;
  387.                     //健康检查 被检查等!
  388.                     if (_tick_index % _config.TickSloveHealth == 0)
  389.                     {
  390.                         var _ago = DateTime.Now.ToUnixTimeSeconds() - _config.TickScanSloveHealth;
  391.                         if (CurrentIsMaster)
  392.                         {
  393.                             //健康检查 检查节点多久没通讯了
  394.                             if (node_list != null && node_list.Count > 0)
  395.                             {
  396.                                 var _post = new PasteMasterResponse { };
  397.                                 _post.master = _current_node;
  398.                                 _post.nodes = node_list?.ToArray();
  399.                                 var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
  400.                                 try
  401.                                 {
  402.                                     await _semaphoreSlim.WaitAsync();
  403.                                     foreach (var _node in node_list)
  404.                                     {
  405.                                         if (_node.host != _current_node.host)
  406.                                         {
  407.                                             if (_node.last_time < _ago)
  408.                                             {
  409.                                                 var _api = await slove_get(_node, _config.ApiHealth);
  410.                                                 if (_api.success)
  411.                                                 {
  412.                                                     if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host)
  413.                                                     {
  414.                                                         //告知对方,你的集群数据有误!
  415.                                                         await slove_post(_node, _postBody, _config.ApiMaster);
  416.                                                     }
  417.                                                 }
  418.                                             }
  419.                                         }
  420.                                         else
  421.                                         {
  422.                                             _node.last_time = DateTime.Now.ToUnixTimeSeconds();//自己直接就是最新的
  423.                                         }
  424.                                     }
  425.                                 }
  426.                                 catch (Exception exl)
  427.                                 {
  428.                                     _logger.LogError("line.341" + exl.Message);
  429.                                 }
  430.                                 finally
  431.                                 {
  432.                                     _semaphoreSlim.Release();
  433.                                 }
  434.                                 //错误次数超过多少次的 执行移除操作!
  435.                                 var removes = node_list.Where(x => x.error_time > _config.RemoveByTime).ToList();
  436.                                 if (removes != null && removes.Count > 0)
  437.                                 {
  438.                                     foreach (var _node in removes)
  439.                                     {
  440.                                         await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.removenode, message = Newtonsoft.Json.JsonConvert.SerializeObject(_node) });
  441.                                     }
  442.                                 }
  443.                             }
  444.                         }
  445.                         else
  446.                         {
  447.                             if (_current_master != null)
  448.                             {
  449.                                 if (_current_master.last_time < _ago)
  450.                                 {
  451.                                     //Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} node:{_current_node?.host} master:{_current_master?.host} timeout!");
  452.                                     var _api = await slove_get(_current_master, _config.ApiHealth);
  453.                                     if (_api.success)
  454.                                     {
  455.                                         //我记录的master和master记录的master不一致
  456.                                         if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host)
  457.                                         {
  458.                                             //告知对方,你的集群数据有误!
  459.                                             await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
  460.                                         }
  461.                                     }
  462.                                     else
  463.                                     {
  464.                                         //master失联了
  465.                                         await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
  466.                                     }
  467.                                 }
  468.                             }
  469.                             else
  470.                             {
  471.                                 //选举?
  472.                                 await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
  473.                             }
  474.                         }
  475.                     }
  476.                 }
  477.                 if (_tick_index > 3600)
  478.                 {
  479.                     _tick_index = 0;
  480.                 }
  481.             }
  482.             catch (Exception exlelapsed)
  483.             {
  484.                 _logger.LogError("line.396" + exlelapsed.Message);
  485.             }
  486.         }
  487.         /// <summary>
  488.         /// 业务端调用 表示把消息打入到集群 发给所有节点或者某些节点
  489.         /// </summary>
  490.         /// <param name="msg"></param>
  491.         /// <param name="node_id"></param>
  492.         /// <param name="group"></param>
  493.         public void PushMsgToNode(string msg, int msg_type = 0, int node_id = 0, int group = 0, string name = "")
  494.         {
  495.             var _msg = new PasteSloveMessage
  496.             {
  497.                 body = msg,
  498.                 msg_type = msg_type,
  499.                 from = _current_node
  500.             };
  501.             var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
  502.             if (!voting)
  503.             {
  504.                 //如果当前在选举中,则暂存!
  505.                 _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });
  506.             }
  507.             else
  508.             {
  509.                 ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });
  510.             }
  511.         }
  512.         /// <summary>
  513.         /// 把消息发送给所有节点,包括自己
  514.         /// </summary>
  515.         /// <param name="msg"></param>
  516.         /// <param name="node_id"></param>
  517.         /// <param name="group"></param>
  518.         public void PushMsgToAll(string msg, int msg_type = 0)
  519.         {
  520.             var _msg = new PasteSloveMessage
  521.             {
  522.                 body = msg,
  523.                 msg_type = msg_type,
  524.                 from = _current_node
  525.             };
  526.             var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
  527.             if (!voting)
  528.             {
  529.                 //如果当前在选举中,则暂存!
  530.                 _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });
  531.             }
  532.             else
  533.             {
  534.                 ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });
  535.             }
  536.         }
  537.         /// <summary>
  538.         /// 发送消息给master所在的节点的业务处理,如果当前为master则变更channel,切换到给业务处理
  539.         /// </summary>
  540.         /// <param name="msg"></param>
  541.         public void PushMsgToMaster(string msg, int msg_type = 0)
  542.         {
  543.             var _msg = new PasteSloveMessage
  544.             {
  545.                 body = msg,
  546.                 from = _current_node,
  547.                 msg_type = msg_type
  548.             };
  549.             var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
  550.             if (CurrentIsMaster || _current_master == null)
  551.             {
  552.                 //切换队列
  553.                 ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });
  554.             }
  555.             else
  556.             {
  557.                 if (!voting)
  558.                 {
  559.                     _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });
  560.                 }
  561.                 else
  562.                 {
  563.                     ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });
  564.                 }
  565.             }
  566.         }
  567.         /// <summary>
  568.         /// 直接推送消息,主要作用在于立马反馈
  569.         /// 如果没有对象,则会发送给master
  570.         /// 可以从from中判断是不是回环了
  571.         /// </summary>
  572.         /// <param name="msg"></param>
  573.         /// <param name="msg_type"></param>
  574.         /// <param name="id"></param>
  575.         /// <param name="name"></param>
  576.         /// <param name="group"></param>
  577.         /// <param name="host"></param>
  578.         /// <returns></returns>
  579.         public async Task<bool> PushDirectionMsg(string msg, int msg_type = 0, int id = 0, string name = "", int group = 0, string host = "")
  580.         {
  581.             var _message = new PasteSloveMessage
  582.             {
  583.                 body = msg,
  584.                 msg_type = msg_type,
  585.             };
  586.             return await PushDirectionMsg(_message, id, name, group, host);
  587.         }
  588.         /// <summary>
  589.         /// 直接发送消息给某一个节点
  590.         /// 如果没有命中则发送给Master
  591.         /// 也就是说,可能发送给当前!注意不要回环了!!!
  592.         /// </summary>
  593.         /// <param name="message"></param>
  594.         /// <param name="id"></param>
  595.         /// <param name="name"></param>
  596.         /// <param name="group"></param>
  597.         /// <param name="host">示例http://192.168.1.5</param>
  598.         /// <returns>是否发送成功,发送给某一个节点了?</returns>
  599.         public async Task<bool> PushDirectionMsg(PasteSloveMessage message, int id = 0, string name = "", int group = 0, string host = "")
  600.         {
  601.             message.from = _current_node;
  602.             var _send = false;
  603.             if (id != 0)
  604.             {
  605.                 if (CurrentId == id)
  606.                 {
  607.                     message.to = _current_node;
  608.                     var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
  609.                     var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_postBody);
  610.                     await ChannelCluster.Writer.WriteAsync(_message);
  611.                     return true;
  612.                 }
  613.                 else
  614.                 {
  615.                     var find = node_list.Where(x => x.id == id).FirstOrDefault();
  616.                     if (find != null && find != default)
  617.                     {
  618.                         message.to = find;
  619.                         var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
  620.                         var _api = await slove_post(find, _postBody, _config.ApiMsg);
  621.                         return _api.success;
  622.                     }
  623.                 }
  624.             }
  625.             if (!string.IsNullOrEmpty(host) && node_list != null)
  626.             {
  627.                 var find = node_list.Where(x => x.host == host).FirstOrDefault();
  628.                 if (find != null && find != default)
  629.                 {
  630.                     message.to = find;
  631.                     var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
  632.                     var _api = await slove_post(find, _postBody, _config.ApiMsg);
  633.                     return _api.success;
  634.                 }
  635.             }
  636.             if (!string.IsNullOrEmpty(name) && node_list != null)
  637.             {
  638.                 var find = node_list.Where(x => x.name == name).FirstOrDefault();
  639.                 if (find != null && find != default)
  640.                 {
  641.                     message.to = find;
  642.                     var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
  643.                     var _api = await slove_post(find, _postBody, _config.ApiMsg);
  644.                     return _api.success;
  645.                 }
  646.             }
  647.             if (group != 0 && node_list != null)
  648.             {
  649.                 var find = node_list.Where(x => x.group == group).FirstOrDefault();
  650.                 if (find != null && find != default)
  651.                 {
  652.                     message.to = find;
  653.                     var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
  654.                     var _api = await slove_post(find, _postBody, _config.ApiMsg);
  655.                     return _api.success;
  656.                 }
  657.             }
  658.             if (!_send)
  659.             {
  660.                 if (CurrentIsMaster || IsSingle)
  661.                 {
  662.                     message.to = _current_node;
  663.                     await ChannelCluster.Writer.WriteAsync(message);
  664.                     return true;
  665.                 }
  666.                 else
  667.                 {
  668.                     if (_current_master != null)
  669.                     {
  670.                         message.to = _current_master;
  671.                         var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);
  672.                         var _api = await slove_post(_current_master, _postBody, _config.ApiMsg);
  673.                         return _api.success;
  674.                     }
  675.                 }
  676.             }
  677.             return _send;
  678.         }
  679.         /// <summary>
  680.         /// 如果当前不是master则不执行,直接抛弃,如果当前是master则执行
  681.         /// </summary>
  682.         /// <param name="msg"></param>
  683.         public async Task<bool> PushMsgToOnlyMaster(string msg, int msg_type = 0)
  684.         {
  685.             var _msg = new PasteSloveMessage
  686.             {
  687.                 body = msg,
  688.                 msg_type = msg_type,
  689.                 from = _current_node
  690.             };
  691.             var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);
  692.             if (CurrentIsMaster || _current_master == null)
  693.             {
  694.                 await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });
  695.                 return true;
  696.             }
  697.             return false;
  698.         }
  699.         /// <summary>
  700.         /// 处理节点中队列的消息
  701.         /// </summary>
  702.         private async void ActionSloveEvent()
  703.         {
  704.             try
  705.             {
  706.                 var _read = await _channel_slove.Reader.ReadAsync();
  707.                 if (_read != null && _read != default)
  708.                 {
  709.                     //_logger.LogInformation("Cluster.Event:" + Newtonsoft.Json.JsonConvert.SerializeObject(_read));
  710.                     switch (_read.action)
  711.                     {
  712.                         case PasteSloveEvent.votemaster:
  713.                             {
  714.                                 //告知别人,我是master
  715.                                 if (_current_node == null)
  716.                                 {
  717.                                     return;
  718.                                 }
  719.                                 _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
  720.                                 if (node_list != null && node_list.Count > 0)
  721.                                 {
  722.                                     var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_current_node);
  723.                                     //必须大于1,有2个及以上节点,才有master的说法
  724.                                     foreach (var _node in node_list)
  725.                                     {
  726.                                         if (_node.host != _current_node.host)
  727.                                         {
  728.                                             var _api = await slove_post(_node, _postBody, _config.ApiVote);
  729.                                             if (_api.success)
  730.                                             {
  731.                                                 //_node.last_time = DateTime.Now.ToUnixTimeSeconds();
  732.                                                 if (_api.master != null)
  733.                                                 {
  734.                                                     if (_api.master.host != _current_node.host)
  735.                                                     {
  736.                                                         //找到一个竞争者? 有节点说他才是master 而且时间比我小 我只好退位让贤
  737.                                                         _current_master = _api.master;
  738.                                                         break;
  739.                                                     }
  740.                                                 }
  741.                                             }
  742.                                         }
  743.                                     }
  744.                                     //选举完成了,我是不是master?
  745.                                     if (_current_master == null)
  746.                                     {
  747.                                         _current_master = _current_node;
  748.                                         await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
  749.                                     }
  750.                                 }
  751.                             }
  752.                             break;
  753.                         case PasteSloveEvent.scanmaster:
  754.                             {
  755.                                 if (_current_node == null)
  756.                                 {
  757.                                     return;
  758.                                 }
  759.                                 _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
  760.                                 if (node_list != null && node_list.Count > 0)
  761.                                 {
  762.                                     voting = true;
  763.                                     //必须大于1,有2个及以上节点,才有master的说法
  764.                                     foreach (var _node in node_list)
  765.                                     {
  766.                                         var _api = await slove_get(_node, _config.ApiAsk);
  767.                                         if (_api.success && _api.master != null)
  768.                                         {
  769.                                             _current_master = _api.master;
  770.                                             if (_current_node.host == _current_master.host)
  771.                                             {
  772.                                                 await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
  773.                                             }
  774.                                             break;
  775.                                         }
  776.                                     }
  777.                                     if (_current_master != null)
  778.                                     {
  779.                                         if (_current_node?.host != _current_master?.host)
  780.                                         {
  781.                                             //告知 master 我要加入集群
  782.                                             var _join = await slove_post(_current_master, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), _config.ApiJoinSlove);
  783.                                             if (!_join.success)
  784.                                             {
  785.                                                 //这里失败了,咋办??? 未完待续
  786.                                                 voting = true;
  787.                                                 _current_master = null;
  788.                                             }
  789.                                         }
  790.                                     }
  791.                                     else
  792.                                     {
  793.                                         //饶了一圈,全部不能访问
  794.                                         _current_master = _current_node;
  795.                                         voting = false;
  796.                                         await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
  797.                                     }
  798.                                 }
  799.                                 else
  800.                                 {
  801.                                     //节点信息不完整 还是只有这么一个节点? 未完待续
  802.                                     _current_master = _current_node;
  803.                                     voting = false;
  804.                                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
  805.                                 }
  806.                             }
  807.                             break;
  808.                         case PasteSloveEvent.suremaster:
  809.                             {
  810.                                 if (_current_node == null)
  811.                                 {
  812.                                     return;
  813.                                 }
  814.                                 if (node_list != null && node_list.Count > 0)
  815.                                 {
  816.                                     var _post = new PasteMasterResponse { };
  817.                                     _post.master = _current_node;
  818.                                     _post.nodes = node_list?.ToArray();
  819.                                     var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
  820.                                     //必须大于1,有2个及以上节点,才有master的说法
  821.                                     foreach (var _node in node_list)
  822.                                     {
  823.                                         if (_node.host != _current_node.host)
  824.                                         {
  825.                                             var _api = await slove_post(_node, _postBody, _config.ApiMaster);
  826.                                         }
  827.                                     }
  828.                                     await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { msg_type = 1, body = _postBody, from = _current_node, time = DateTime.Now.ToUnixTimeSeconds() });
  829.                                 }
  830.                             }
  831.                             break;
  832.                         case PasteSloveEvent.msg_to_master:
  833.                             {
  834.                                 var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);
  835.                                 if (CurrentIsMaster)
  836.                                 {
  837.                                     _message.from = _current_node;
  838.                                     _message.to = _current_node;
  839.                                     await ChannelCluster.Writer.WriteAsync(_message);
  840.                                 }
  841.                                 else
  842.                                 {
  843.                                     //通过 api 发送给 master
  844.                                     if (_current_master != null)
  845.                                     {
  846.                                         _message.from = _current_node;
  847.                                         _message.to = _current_master;
  848.                                         var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
  849.                                         //如果发送失败,暂停业务,进行vote阶段
  850.                                         var _join = await slove_post(_current_master, _postBody, _config.ApiMsg);
  851.                                         if (!_join.success)
  852.                                         {
  853.                                             if (_read.try_time > 0)
  854.                                             {
  855.                                                 _read.try_time = _read.try_time - 1;
  856.                                                 //这里失败了,咋办??? 未完待续
  857.                                                 await _channel_slove.Writer.WriteAsync(_read);
  858.                                             }
  859.                                         }
  860.                                     }
  861.                                 }
  862.                             }
  863.                             break;
  864.                         case PasteSloveEvent.msg_to_node:
  865.                             {
  866.                                 if (node_list == null || node_list.Count == 0)
  867.                                 {
  868.                                     return;
  869.                                 }
  870.                                 var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);
  871.                                 if (_read.node_id != 0)
  872.                                 {
  873.                                     var nodes = node_list.Where(x => x.id == _read.node_id).ToList();
  874.                                     if (nodes != null && nodes.Count > 0)
  875.                                     {
  876.                                         foreach (var _node in nodes)
  877.                                         {
  878.                                             _message.to = _node;
  879.                                             var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
  880.                                             var _api = await slove_post(_node, _postBody, _config.ApiMsg);
  881.                                         }
  882.                                     }
  883.                                 }
  884.                                 if (_read.group != 0)
  885.                                 {
  886.                                     var nodes = node_list.Where(x => x.group == _read.group).ToList();
  887.                                     if (nodes != null && nodes.Count > 0)
  888.                                     {
  889.                                         foreach (var _node in nodes)
  890.                                         {
  891.                                             _message.to = _node;
  892.                                             var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
  893.                                             var _api = await slove_post(_node, _postBody, _config.ApiMsg);
  894.                                         }
  895.                                     }
  896.                                 }
  897.                                 if (_read.group == 0 && _read.node_id == 0)
  898.                                 {
  899.                                     foreach (var _node in node_list)
  900.                                     {
  901.                                         _message.to = _node;
  902.                                         var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
  903.                                         var _api = await slove_post(_node, _postBody, _config.ApiMsg);
  904.                                     }
  905.                                 }
  906.                             }
  907.                             break;
  908.                         case PasteSloveEvent.msg_to_all_node:
  909.                             {
  910.                                 if (node_list == null || node_list.Count == 0)
  911.                                 {
  912.                                     return;
  913.                                 }
  914.                                 var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);
  915.                                 foreach (var _node in node_list)
  916.                                 {
  917.                                     _message.to = _node;
  918.                                     var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);
  919.                                     var _api = await slove_post(_node, _postBody, _config.ApiMsg);
  920.                                 }
  921.                             }
  922.                             break;
  923.                         case PasteSloveEvent.removenode:
  924.                             {
  925.                                 if (CurrentIsMaster)
  926.                                 {
  927.                                     var _node = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteNodeModel>(_read.message);
  928.                                     await RemoveCurrentNode(_node);
  929.                                     //下发群发消息
  930.                                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
  931.                                     var _messge = new PasteSloveMessage
  932.                                     {
  933.                                         body = _read.message,
  934.                                         from = _current_node,
  935.                                         msg_type = 4,
  936.                                     };
  937.                                     await ChannelCluster.Writer.WriteAsync(_messge);
  938.                                 }
  939.                             }
  940.                             break;
  941.                         case PasteSloveEvent.unziplist:
  942.                             {
  943.                                 if (ziplist != null && ziplist.Count > 0)
  944.                                 {
  945.                                     var _count = ziplist.Count;
  946.                                     Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}将从队列ZIPLIST中发送{ziplist.Count}条消息");
  947.                                     for (var k = 0; k < _count; k++)
  948.                                     {
  949.                                         if (!voting)
  950.                                         {
  951.                                             if (ziplist.Count > 0)
  952.                                             {
  953.                                                 var _first = ziplist[0];
  954.                                                 await _channel_slove.Writer.WriteAsync(_first);
  955.                                                 ziplist.RemoveAt(0);
  956.                                             }
  957.                                         }
  958.                                     }
  959.                                 }
  960.                                 break;
  961.                             }
  962.                         case PasteSloveEvent.findnode:
  963.                             {
  964.                                 var _find = false;
  965.                                 if (!string.IsNullOrEmpty(node_code))
  966.                                 {
  967.                                     if (node_list != null && node_list.Count > 0)
  968.                                     {
  969.                                         foreach (var _node in node_list)
  970.                                         {
  971.                                             if (String.IsNullOrEmpty(_node.node_code))
  972.                                             {
  973.                                                 var _build = new PasteNodeModel
  974.                                                 {
  975.                                                     group = _node.group,
  976.                                                     id = _node.id,
  977.                                                     host = _node.host,
  978.                                                     node_code = node_code
  979.                                                 };
  980.                                                 var _api = await slove_post(_build, Newtonsoft.Json.JsonConvert.SerializeObject(_build), _config.ApiFind);
  981.                                                 if (_api.success)
  982.                                                 {
  983.                                                     if (_api.node != null)
  984.                                                     {
  985.                                                         if (_api.node.node_code == node_code)
  986.                                                         {
  987.                                                             _find = true;
  988.                                                             break;
  989.                                                         }
  990.                                                     }
  991.                                                 }
  992.                                             }
  993.                                         }
  994.                                     }
  995.                                 }
  996.                                 if (_find)
  997.                                 {
  998.                                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
  999.                                 }
  1000.                             }
  1001.                             break;
  1002.                     }
  1003.                 }
  1004.             }
  1005.             catch (Exception exl)
  1006.             {
  1007.                 _logger.LogError(exl.Message);
  1008.                 await Task.Delay(1000);
  1009.             }
  1010.             finally
  1011.             {
  1012.                 ActionSloveEvent();
  1013.             }
  1014.         }
  1015.         /// <summary>
  1016.         /// 知道当前节点信息后,注册到节点或者更新当前节点信息
  1017.         /// </summary>
  1018.         /// <param name="host"></param>
  1019.         /// <param name="id"></param>
  1020.         /// <param name="group"></param>
  1021.         public async void Register(string host, int id, int group = 0, string name = "")
  1022.         {
  1023.             Console.WriteLine($"Register At:{host}");
  1024.             var _current = new PasteNodeModel
  1025.             {
  1026.                 group = group,
  1027.                 host = host,
  1028.                 id = id,
  1029.                 name = name,
  1030.                 last_time = DateTime.Now.ToUnixTimeSeconds()
  1031.             };
  1032.             _current_node = _current;
  1033.             await AddNodeToList(_current);
  1034.             //准备问询
  1035.             await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
  1036.         }
  1037.         /// <summary>
  1038.         /// 有新的节点加入 可以是业务上读取节点列表,然后遍历调用这个
  1039.         /// </summary>
  1040.         /// <param name="input"></param>
  1041.         public async void Join(PasteNodeModel input)
  1042.         {
  1043.             await AddNodeToList(input);
  1044.             //群发
  1045.             if (CurrentIsMaster)
  1046.             {
  1047.                 await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });
  1048.                 //告知系统有新的节点
  1049.                 var _messge = new PasteSloveMessage
  1050.                 {
  1051.                     body = Newtonsoft.Json.JsonConvert.SerializeObject(input),
  1052.                     from = _current_node,
  1053.                     msg_type = 3,
  1054.                 };
  1055.                 await ChannelCluster.Writer.WriteAsync(_messge);
  1056.             }
  1057.         }
  1058.         /// <summary>
  1059.         /// 添加节点
  1060.         /// </summary>
  1061.         /// <param name="input"></param>
  1062.         /// <param name="scan">加入后,是否搜索master</param>
  1063.         public async Task AddNodeToList(PasteNodeModel input)
  1064.         {
  1065.             try
  1066.             {
  1067.                 await _semaphoreSlim.WaitAsync();
  1068.                 //Console.WriteLine($"--join--:{input.host}");
  1069.                 if (node_list == null)
  1070.                 {
  1071.                     node_list = new List<PasteNodeModel> { input };
  1072.                 }
  1073.                 else
  1074.                 {
  1075.                     var find = node_list.Where(x => x.host == input.host).FirstOrDefault();
  1076.                     if (find != null && find != default)
  1077.                     {
  1078.                         find.id = input.id;
  1079.                         find.group = input.group;
  1080.                     }
  1081.                     else
  1082.                     {
  1083.                         node_list.Add(input);
  1084.                     }
  1085.                 }
  1086.             }
  1087.             catch (Exception ex)
  1088.             {
  1089.                 _logger.LogError(ex.Message);
  1090.             }
  1091.             finally
  1092.             {
  1093.                 _semaphoreSlim.Release();
  1094.             }
  1095.         }
  1096.         /// <summary>
  1097.         /// 扫描master 这个发生在加入新的节点信息后 或者启动后
  1098.         /// </summary>
  1099.         public async void ScanMaster()
  1100.         {
  1101.             await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
  1102.         }
  1103.         /// <summary>
  1104.         /// 健康检查,返回当前信息
  1105.         /// </summary>
  1106.         /// <returns></returns>
  1107.         public PasteApiResponse Health()
  1108.         {
  1109.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node, node_count = node_list != null ? node_list.Count : 0 };
  1110.         }
  1111.         /// <summary>
  1112.         /// 被问询 当前有没有master
  1113.         /// </summary>
  1114.         public PasteApiResponse Ask()
  1115.         {
  1116.             if (_current_node != null)
  1117.             {
  1118.                 if (_current_master == null)
  1119.                 {
  1120.                     _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
  1121.                     //_current_master = _current_node;
  1122.                     //CurrentIsMaster = true;
  1123.                     //进入分发流程,告知我是master
  1124.                     _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.votemaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });
  1125.                 }
  1126.             }
  1127.             //if (_current_node != null)
  1128.             //{
  1129.             //    _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();
  1130.             //}
  1131.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1132.         }
  1133.         /// <summary>
  1134.         /// 集群间的消息
  1135.         /// </summary>
  1136.         /// <param name="msg"></param>
  1137.         /// <returns></returns>
  1138.         public async Task<PasteApiResponse> Msg(PasteSloveMessage msg)
  1139.         {
  1140.             await ChannelCluster.Writer.WriteAsync(msg);
  1141.             msg_count++;
  1142.             if (msg.msg_type == 2)
  1143.             {
  1144.             }
  1145.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1146.         }
  1147.         /// <summary>
  1148.         /// 有master分发消息,说他是master
  1149.         /// </summary>
  1150.         /// <param name="input"></param>
  1151.         /// <returns></returns>
  1152.         public async Task<PasteApiResponse> Vote(PasteNodeModel input)
  1153.         {
  1154.             if (_current_node != null)
  1155.             {
  1156.                 //排除当前节点问当前节点
  1157.                 if (_current_node.host != input.host)
  1158.                 {
  1159.                     if (_current_master == null)
  1160.                     {
  1161.                         //当前没有master 那来的就是master吧
  1162.                         _current_master = input;
  1163.                     }
  1164.                     else
  1165.                     {
  1166.                         //当前有master
  1167.                         if (_current_master.host != input.host)
  1168.                         {
  1169.                             if (_current_master.vote_time < input.vote_time)
  1170.                             {
  1171.                                 var _api = await slove_get(_current_master, _config.ApiHealth);
  1172.                                 if (_api.success)
  1173.                                 {
  1174.                                     return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1175.                                 }
  1176.                                 else
  1177.                                 {
  1178.                                     _current_master = input;
  1179.                                 }
  1180.                             }
  1181.                         }
  1182.                         return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1183.                     }
  1184.                 }
  1185.             }
  1186.             //返回 哪个是master
  1187.             return new PasteApiResponse { success = true, master = input, node = _current_node };
  1188.         }
  1189.         /// <summary>
  1190.         /// master选举完成,master广播说他是master
  1191.         /// </summary>
  1192.         /// <param name="input"></param>
  1193.         /// <returns></returns>
  1194.         public async Task<PasteApiResponse> Master(PasteMasterResponse input)
  1195.         {
  1196.             var _old_is_master = CurrentIsMaster;
  1197.             _current_master = input.master;
  1198.             if (input.nodes != null)
  1199.             {
  1200.                 foreach (var _node in input.nodes)
  1201.                 {
  1202.                     await AddNodeToList(_node);
  1203.                 }
  1204.             }
  1205.             voting = false;
  1206.             //如果队列有暂存消息,则发送!
  1207.             await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.unziplist });
  1208.             var _new_is_master = CurrentIsMaster;
  1209.             if (_old_is_master && !_new_is_master)
  1210.             {
  1211.                 await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { msg_type = 2, body = Newtonsoft.Json.JsonConvert.SerializeObject(input), from = _current_node });
  1212.             }
  1213.             //返回 哪个是master
  1214.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1215.         }
  1216.         /// <summary>
  1217.         ///
  1218.         /// </summary>
  1219.         /// <param name="input"></param>
  1220.         /// <returns></returns>
  1221.         public async Task<PasteApiResponse> Leave(PasteNodeModel input)
  1222.         {
  1223.             if (_current_node != null)
  1224.             {
  1225.                 if (_current_node.host != input.host)
  1226.                 {
  1227.                     //谁离线了
  1228.                     //如果自己是master 则应该是节点离线 通知其他节点,节点列表变更
  1229.                     //如果自己是node 离线的是master 则自己进去选择
  1230.                     if (CurrentIsMaster)
  1231.                     {
  1232.                         if (node_list != null && node_list.Count > 0)
  1233.                         {
  1234.                             await RemoveCurrentNode(input);
  1235.                             if (node_list.Count > 0)
  1236.                             {
  1237.                                 var _post = new PasteMasterResponse { };
  1238.                                 _post.master = _current_node;
  1239.                                 _post.nodes = node_list?.ToArray();
  1240.                                 var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);
  1241.                                 foreach (var _node in node_list)
  1242.                                 {
  1243.                                     if (_node.host != _current_node.host && _node.host != input.host)
  1244.                                     {
  1245.                                         var _api = await slove_post(_node, _postBody, _config.ApiMaster);
  1246.                                     }
  1247.                                 }
  1248.                             }
  1249.                         }
  1250.                         //告知系统有新的节点
  1251.                         var _messge = new PasteSloveMessage
  1252.                         {
  1253.                             body = Newtonsoft.Json.JsonConvert.SerializeObject(input),
  1254.                             from = _current_node,
  1255.                             msg_type = 4,
  1256.                         };
  1257.                         await ChannelCluster.Writer.WriteAsync(_messge);
  1258.                     }
  1259.                     else
  1260.                     {
  1261.                         if (_current_master != null && input.host == _current_master.host)
  1262.                         {
  1263.                             _current_master = null;
  1264.                             await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
  1265.                         }
  1266.                     }
  1267.                 }
  1268.             }
  1269.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1270.         }
  1271.         /// <summary>
  1272.         ///
  1273.         /// </summary>
  1274.         /// <param name="input"></param>
  1275.         /// <returns></returns>
  1276.         public async Task<PasteApiResponse> Remove(PasteNodeModel input)
  1277.         {
  1278.             if (_current_node != null)
  1279.             {
  1280.                 if (_current_node.host == input.host)
  1281.                 {
  1282.                     //我居然被移除了,赶紧加回去
  1283.                     await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });
  1284.                 }
  1285.                 await RemoveCurrentNode(input);
  1286.             }
  1287.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1288.         }
  1289.         /// <summary>
  1290.         /// 移除节点
  1291.         /// </summary>
  1292.         /// <param name="input"></param>
  1293.         /// <returns></returns>
  1294.         private async Task RemoveCurrentNode(PasteNodeModel input)
  1295.         {
  1296.             try
  1297.             {
  1298.                 await _semaphoreSlim.WaitAsync();
  1299.                 if (node_list != null)
  1300.                 {
  1301.                     var find = node_list.Where(x => x.host == input.host).FirstOrDefault();
  1302.                     if (find != null && find != default)
  1303.                     {
  1304.                         node_list.Remove(find);
  1305.                     }
  1306.                 }
  1307.             }
  1308.             catch (Exception exl)
  1309.             {
  1310.                 _logger.LogError(exl.Message);
  1311.             }
  1312.             finally
  1313.             {
  1314.                 _semaphoreSlim.Release();
  1315.             }
  1316.         }
  1317.         /// <summary>
  1318.         /// 打印当前状态
  1319.         /// </summary>
  1320.         /// <returns></returns>
  1321.         public string Status()
  1322.         {
  1323.             return $"总节点数:{node_list?.Count} Master:{_current_master?.host} Current:{_current_node?.host} Count:{msg_count}";
  1324.         }
  1325.         /// <summary>
  1326.         /// 打印当前状态
  1327.         /// </summary>
  1328.         /// <returns></returns>
  1329.         public PasteApiResponse State()
  1330.         {
  1331.             return new PasteApiResponse { success = true, master = _current_master, node = _current_node };
  1332.         }
  1333.         /// <summary>
  1334.         /// 离线 告诉除了自己的第一个节点,我要离线了!
  1335.         /// </summary>
  1336.         private async Task _leave()
  1337.         {
  1338.             if (CurrentIsMaster)
  1339.             {
  1340.                 if (node_list != null && node_list.Count > 0)
  1341.                 {
  1342.                     if (_current_node != null)
  1343.                     {
  1344.                         foreach (var _node in node_list)
  1345.                         {
  1346.                             if (_node.host != _current_node?.host)
  1347.                             {
  1348.                                 var _api = await slove_post(_node, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), $"{_config.ApiLeave}");
  1349.                                 if (_api.success)
  1350.                                 {
  1351.                                     break;
  1352.                                 }
  1353.                             }
  1354.                         }
  1355.                     }
  1356.                 }
  1357.             }
  1358.             else
  1359.             {
  1360.                 if (_current_master != null && _current_node != null)
  1361.                 {
  1362.                     await slove_post(_current_master, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), $"{_config.ApiLeave}");
  1363.                 }
  1364.             }
  1365.         }
  1366.         #region
  1367.         /// <summary>
  1368.         ///
  1369.         /// </summary>
  1370.         /// <param name="node"></param>
  1371.         /// <param name="path"></param>
  1372.         /// <returns></returns>
  1373.         private async Task<PasteApiResponse> slove_get(PasteNodeModel node, string path)
  1374.         {
  1375.             var _read = await get($"{node.host}{path}", node.host);
  1376.             if (_read.code == 200)
  1377.             {
  1378.                 if (node.error_time > 0)
  1379.                 {
  1380.                     node.error_time = 0;
  1381.                 }
  1382.                 node.last_time = DateTime.Now.ToUnixTimeSeconds();
  1383.                 return Newtonsoft.Json.JsonConvert.DeserializeObject<PasteApiResponse>(_read.response);
  1384.             }
  1385.             else
  1386.             {
  1387.                 node.error_time++;
  1388.                 return new PasteApiResponse { success = false };
  1389.             }
  1390.         }
  1391.         /// <summary>
  1392.         ///
  1393.         /// </summary>
  1394.         /// <param name="node"></param>
  1395.         /// <param name="body"></param>
  1396.         /// <param name="path"></param>
  1397.         /// <returns></returns>
  1398.         private async Task<PasteApiResponse> slove_post(PasteNodeModel node, string body, string path)
  1399.         {
  1400.             var _read = await post($"{node.host}{path}", body, node.host);
  1401.             if (_read.code == 200)
  1402.             {
  1403.                 if (node.error_time > 0)
  1404.                 {
  1405.                     node.error_time = 0;
  1406.                 }
  1407.                 node.last_time = DateTime.Now.ToUnixTimeSeconds();
  1408.                 return Newtonsoft.Json.JsonConvert.DeserializeObject<PasteApiResponse>(_read.response);
  1409.             }
  1410.             else
  1411.             {
  1412.                 node.error_time++;
  1413.                 return new PasteApiResponse { success = false };
  1414.             }
  1415.         }
  1416.         /// <summary>
  1417.         ///
  1418.         /// </summary>
  1419.         /// <param name="url"></param>
  1420.         /// <returns></returns>
  1421.         private async Task<(int code, string response)> get(string url, string host)
  1422.         {
  1423.             try
  1424.             {
  1425.                 var request = new HttpRequestMessage(HttpMethod.Get, url);
  1426.                 request.Headers.Add("slovetoken", _config.SloveToken);
  1427.                 if (_current_node != null)
  1428.                 {
  1429.                     request.Headers.Add("fromnode", _current_node.host);
  1430.                 }
  1431.                 if (_current_master != null)
  1432.                 {
  1433.                     request.Headers.Add("masternode", _current_master.host);
  1434.                 }
  1435.                 request.Headers.Add("ContentType", "application/json;charset=utf-8");
  1436.                 var client = _httpClientFactory.CreateClient();
  1437.                 client.Timeout = TimeSpan.FromSeconds(4);
  1438.                 client.DefaultRequestHeaders.Add("ContentType", "application/json;charset=utf-8");
  1439.                 var response = await client.SendAsync(request);
  1440.                 using var stream = new StreamReader(await response.Content.ReadAsStreamAsync(), Encoding.UTF8);
  1441.                 var backstream = stream.ReadToEnd();
  1442.                 if (response.StatusCode == HttpStatusCode.OK)
  1443.                 {
  1444.                     _logger.LogInformation(backstream);
  1445.                 }
  1446.                 else
  1447.                 {
  1448.                     _logger.LogError(backstream);
  1449.                 }
  1450.                 return ((int)response.StatusCode, backstream);
  1451.             }
  1452.             catch (Exception exl)
  1453.             {
  1454.                 _logger.LogError(exl.Message);
  1455.                 return (500, exl.Message);
  1456.             }
  1457.         }
  1458.         /// <summary>
  1459.         ///
  1460.         /// </summary>
  1461.         /// <param name="url"></param>
  1462.         /// <param name="postdata"></param>
  1463.         /// <returns></returns>
  1464.         private async Task<(int code, string response)> post(string url, string postdata, string host)
  1465.         {
  1466.             try
  1467.             {
  1468.                 using (HttpContent httpcontent = new StringContent(postdata))
  1469.                 {
  1470.                     httpcontent.Headers.Add("slovetoken", _config.SloveToken);
  1471.                     if (_current_node != null)
  1472.                     {
  1473.                         httpcontent.Headers.Add("fromnode", _current_node.host);
  1474.                     }
  1475.                     if (_current_master != null)
  1476.                     {
  1477.                         httpcontent.Headers.Add("masternode", _current_master.host);
  1478.                     }
  1479.                     httpcontent.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json;chartset=utf-8");
  1480.                     var client = _httpClientFactory.CreateClient();
  1481.                     client.Timeout = TimeSpan.FromSeconds(4);
  1482.                     client.DefaultRequestHeaders.Referrer = new Uri(url);
  1483.                     var response = await client.PostAsync(url, httpcontent);
  1484.                     var backstream = await response.Content.ReadAsStringAsync();
  1485.                     if (response.StatusCode != HttpStatusCode.OK && response.StatusCode != HttpStatusCode.NoContent && response.StatusCode != HttpStatusCode.Created)
  1486.                     {
  1487.                         _logger.LogError($"URL:{url} CODE:{response.StatusCode} POST:{postdata} BACK:{backstream}");
  1488.                     }
  1489.                     return ((int)response.StatusCode, backstream);
  1490.                 }
  1491.             }
  1492.             catch (Exception exl)
  1493.             {
  1494.                 _logger.LogError(exl.Message);
  1495.                 return (500, exl.Message);
  1496.             }
  1497.         }
  1498.         /// <summary>
  1499.         ///
  1500.         /// </summary>
  1501.         public void Dispose()
  1502.         {
  1503.             //离开集合
  1504.             _leave().Wait();
  1505.             if (_timer != null)
  1506.             {
  1507.                 _timer.Stop();
  1508.                 _timer.Dispose();
  1509.             }
  1510.             //Task.CompletedTask;
  1511.         }
  1512.         #endregion
  1513.         //是否要压入队列
  1514.         //统计数据打印 某一个消息的次数!
  1515.     }
  1516. }
复制代码
来源:https://www.cnblogs.com/pastespider/p/18244253
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具