翼度科技»论坛 编程开发 .net 查看内容

开源:Taurus.DTS 微服务分布式任务框架,支持即时任务、延时任务、Cron表

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
前言:

在发布完:开源:Taurus.DTC 微服务分布式事务框架,支持 .Net 和 .Net Core 双系列版本,之后想想,好像除了事务外,感觉里面多了一个任务发布订阅的基础功能。
本想既然都有了基础发布订阅功能了,那要不要顺带加上延时发布功能呢?加上了会不会让事务组件不纯了?
经过一翻深思,是在其上补上功能,还是,重新写一个组件,起初起名是个难题,因为 DTC 也可以是 Distributed Task Cxxxxx, 组件重名了?
经过一翻英文大作战,找到了:Distributed Task Scheduler,简写可以是DTS了,才开始重启一个组件。
于是就有了这个Taurus.DTS 任务组件,而且功能除了原有的即时任务发布订阅,和本来想加上的延时任务,后面又补上了基于Cron表达式的定时任务,和广播群发任务。
经过一翻大作战,有 Taurus.DTC 的基础代码作底层支持,花了大几天,解决了各种疑难杂症之后,终于出来了。
1、开源地址:

https://github.com/cyq1162/Taurus.DTS

2、Nuget 包引用ID:

由于 CYQ.Data Orm 组件本身支持10多种数据库,因此提供的包,只根据消息队列的需要分拆提供。
默认Taurus.DTS 支持同时使用 RabbitMQ 和 Kafka 两种消息队列。
如果一个项目中只用RabbitMQ,则引入 Tarurus.DTS.RabbitMQ,减少对 Kafka 的依赖包。

编绎的版本:支持太多,发布是个苦力活:
  1. .Net 系列:由于引用依赖关系(RabbitMQ最低是 4.0,Kafka最低是 4.5);
  2. .Net Core系列、支持 2.1 到 8.0 版本及以后。
  3. Standard 标准库:支持2.1 及以后。
复制代码

3、Taurus.DTS 微服务分布式任务框架基础说明:

基础说明:
  1. 1、框架分为: Client(客户端,即任务发起端)和 Server(服务端,即方法订阅方)。
  2. 2、框架支持:即时任务、延时任务、Cron表达式任务定时任务、广播任务,四种方式。
  3. 3、项目需要配置的参数:1、数据库(可选);2、MQ(必选)。
复制代码
数据存储:

可选择数据库(MSSQL、MySql、Oracle、PostgreSql 等 CYQ.Data 所支持的10多种数据库之一)
MSSQL配置示例如下:
  1. {
  2.   "ConnectionStrings": {
  3.     "DTS.Server.Conn": "server=.;database=MSLog;uid=sa;pwd=123456"
  4.   }
  5. }
复制代码
消息队列:

目前消息队列支持 RabbitMQ 或者 Kafka(配置其中一方即可):
  1. {
  2.   "AppSettings": {
  3.   "DTS.Server.Rabbit":"127.0.0.1;guest;guest;/",//ip;username;password;virtualpath;
  4.   "DTS.Server.Kafka":"127.0.0.1:9092"
  5.   }
  6. }
复制代码
以上配置为Server端,客户端更改 Server 为 Client 即可。
4、Server 端 使用示例:

1、Nuget 搜索 Taurus.DTS 引入工程项目中。
2、如果是 ASP.Net Core 程序:Program 或 Startup 添加服务使用引入:
  1.   services.AddTaurusDts(); // 服务添加。
  2.   app.UseTaurusDts(TaskStartType.Server); //服务使用,启用服务端
复制代码
3、appsettings.json 配置基本属性:
  1.   {
  2.   "ConnectionStrings": {
  3.     "DTS.Server.Conn": "host=localhost;port=3306;database=cyqdata;uid=root;pwd=123456;Convert Zero Datetime=True;"
  4.   },
  5.   "AppSettings": {
  6.     "DTS.Server.Rabbit": "127.0.0.1;guest;guest;/" //IP;UserName;Password;VirtualPaath
  7. }
复制代码
4、选择数据库对应的依赖组件,如MySql,可以:
  1. Nuget 上可以搜索 MySql.Data 、或者 CYQ.Data.MySql (会自动引入MySql.Data)  都可, 引入项目即可。
复制代码
5、代码编写,可以参考源码中提供的示例代码,如下为控制台示例代码:
  1. using System;
  2. using Taurus.Plugin.DistributedTask;
  3. namespace Console_App_Server {
  4. internal class Program
  5. {
  6.      static void Main(string[] args)
  7.     {
  8.         DTSConfig.Server.Rabbit = "127.0.0.1;guest;guest;/";
  9.         //DTSConfig.Server.Kafka = "127.0.0.1:9092;";
  10.         //DTSConfig.Server.Conn = DTSConfig.Client.Conn;
  11.         DTSConfig.ProjectName = "ConsoleApp5";
  12.         DTS.Server.Start();//start client and server
  13.         Console.WriteLine("---------------------------------------");
  14.         Console.ReadLine();
  15.     }
  16. }
  17. /// <summary>
  18. /// 服务端 server class need to public
  19. /// </summary>
  20. public class Server
  21. {
  22.     [DTSSubscribe("DoInstantTask")]
  23.     public static bool A(DTSSubscribePara para)
  24.     {
  25.         para.CallBackContent = "show you a.";
  26.         return true;
  27.     }
  28.     [DTSSubscribe("DoDelayTask")]
  29.     private static bool B(DTSSubscribePara para)
  30.     {
  31.         para.CallBackContent = "show you b.";
  32.         return true;
  33.     }
  34.     [DTSSubscribe("DoCronTask")]
  35.     private static bool C(DTSSubscribePara para)
  36.     {
  37.         para.CallBackContent = "show you c.";
  38.         return true;
  39.     }
  40.     /// <summary>
  41.     /// 定时任务
  42.     /// </summary>
  43.     [DTSSubscribe("DoBroadastTask")]
  44.     private static bool TimerTask(DTSSubscribePara para)
  45.     {
  46.         para.CallBackContent = "show you d.";
  47.         return true;
  48.     }
  49. }
  50. }
复制代码
5、Client 端 使用示例:

1、Nuget 搜索 Taurus.DTS 引入工程项目中。
2、如果是ASP.Net Core 程序:Program 或 Startup 添加服务使用引入:
  1.   services.AddTaurusDts(); // 服务添加
  2.   app.UseTaurusDts(StartType.Client); //服务使用,启用服务端
复制代码
3、appsettings.json 配置基本属性:
  1.   {
  2.   "ConnectionStrings": {
  3.     "DTS.Client.Conn": "host=localhost;port=3306;database=cyqdata;uid=root;pwd=123456;Convert Zero Datetime=True;"
  4.   },
  5.   "AppSettings": {
  6.     "DTS.Client.Rabbit": "127.0.0.1;guest;guest;/" //IP;UserName;Password;VirtualPaath
  7. }
复制代码
4、选择数据库对应的依赖组件,如MySql,可以:
  1. Nuget 上可以搜索 MySql.Data 、或者 CYQ.Data.MySql (会自动引入MySql.Data)  都可, 引入项目即可。
复制代码
5、代码编写,可以参考源码中提供的示例代码,如下为控制台示例代码:
  1. using System;
  2. using System.Threading;
  3. using Taurus.Plugin.DistributedTask;
  4. namespace Console_App_Client {
  5.   internal class Program
  6.   {
  7.   
  8.     static void Main(string[] args)
  9.     {
  10.    
  11.         DTSConfig.Client.IsPrintTraceLog = false;
  12.         //AppConfig.Redis.Servers = "127.0.0.1:6379";
  13.         DTSConfig.Client.Rabbit = "127.0.0.1;guest;guest;/";
  14.         //DTSConfig.Client.Kafka = "127.0.0.1:9092;";
  15.         DTSConfig.Client.Conn = "server=.;database=mslog;uid=sa;pwd=123456";
  16.         DTSConfig.ProjectName = "ConsoleApp5";
  17.         DTS.Client.Start();//start client and server
  18.         
  19.         Console.WriteLine("---------------------------------------");
  20.         Console.WriteLine("1-InstantTask、2-DelayTask(1Minutes)、3-CronTask、4-DeleteCronTask、5-BroadastTask");
  21.         Console.WriteLine("Input :1、2、3、4、5,Press Enter.");
  22.         while (true)
  23.         {
  24.             string line = Console.ReadLine();
  25.             try
  26.             {
  27.                 Client.Run(int.Parse(line));
  28.             }
  29.             catch(Exception err)
  30.             {
  31.                 Console.WriteLine(err.Message);
  32.             }
  33.             
  34.         }
  35.     }
  36. }
  37. /// <summary>
  38. /// 客户端 client class need to public if has callback method.
  39. /// </summary>
  40. public class Client
  41. {
  42.     public static void Run(int i)
  43.     {
  44.         if (i == 2)
  45.         {
  46.             //发布一个延时1分钟的任务
  47.             DTS.Client.Delay.PublishAsync(1, "i publish a delay task.", "DoDelayTask", "DelayCallBack");
  48.             Console.WriteLine("Wait for 1 minute...");
  49.         }
  50.         else if (i == 3)
  51.         {
  52.             //发布一个秒在30时的循环任务。
  53.             DTS.Client.Cron.PublishAsync("10,30,50 * * * * ?", "i publish a timer task with cron express.", "DoCronTask", "CronCallBack");
  54.             Console.WriteLine("Wait for execute task when second is 10,30,50...");
  55.         }
  56.         else if (i == 4)
  57.         {
  58.             //发布一个秒在30时的循环任务。
  59.             DTS.Client.Cron.DeleteAsync("DoCronTask", null, "CronCallBack");
  60.         }
  61.         else if (i == 5)
  62.         {
  63.             //发布一个广播任务
  64.             DTS.Client.Broadast.PublishAsync("i publish a task for all server.", "DoBroadastTask", "BroadastCallBack");
  65.         }
  66.         else
  67.         {
  68.             for (int k = 0; k < 1; k++)
  69.             {
  70.                 //发布一个即时任务
  71.                 DTS.Client.Instant.PublishAsync("i publish a task instantly.", "DoInstantTask", "InstantCallBack");
  72.                 Console.WriteLine(k);
  73.             }
  74.             
  75.         }
  76.     }
  77.     [DTSCallBack("InstantCallBack")]
  78.     [DTSCallBack("DelayCallBack")]
  79.     [DTSCallBack("CronCallBack")]
  80.     [DTSCallBack("BroadastCallBack")]
  81.     private static void OnCallBack(DTSCallBackPara para)
  82.     {
  83.         Console.WriteLine("Client callback : " + para.TaskType + " - " + para.CallBackKey + " - " + para.CallBackContent);
  84.     }
  85. }
  86. }
复制代码
6、Demo 运行示例:

demo 地址:https://github.com/cyq1162/Taurus.DTS/tree/master/demo
启动运行截图:

 输入1,发布即时任务:

7、其它:CYQ.Data 支持的数据库链接语句示例
  1. ###--------------------------------------------------------###
  2.    Txt::  Txt Path=E:\
  3.    Xml::  Xml Path=E:\
  4. Access::  Provider=Microsoft.Jet.OLEDB.4.0; Data Source=E:\cyqdata.mdb
  5. Sqlite::  Data Source=E:\cyqdata.db;failifmissing=false;
  6. MySql::  host=localhost;port=3306;database=cyqdata;uid=root;pwd=123456;Convert Zero Datetime=True;
  7. Mssql::  server=.;database=cyqdata;uid=sa;pwd=123456;provider=mssql;
  8. Sybase::  data source=127.0.0.1;port=5000;database=cyqdata;uid=sa;pwd=123456;provider=sybase;
  9. Postgre:  server=localhost;uid=sa;pwd=123456;database=cyqdata;provider=pg;
  10.     DB2:  Database=SAMPLE;User ID=administrator;Server=127.0.0.1;password=1234560;provider=db2;
  11. FireBird  user id=SYSDBA;password=123456;database=d:\\test.dbf;server type=Default;data source=127.0.0.1;port number=3050;provider=firebird;
  12. Dameng::  user id=SYSDBA;password=123456789;data source=127.0.0.1;schema=test;provider=dameng;
  13. KingBaseES server=127.0.0.1;User Id=system;Password=123456;Database=test;Port=54321;schema=public;provider=kingbasees;
  14. Oracle ODP.NET::
  15. Data Source=(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=127.0.0.1)(PORT = 1521)))(CONNECT_DATA =(SID = orcl)));User ID=sa;password=123456
  16. 由于各种数据库链接语句基本一致,除了特定写法外,可以通过链接补充:provider=mssql、provider=mysql、provider=db2、provider=postgre等来区分。
  17. ###--------------------------------------------------------###
复制代码
8、总结:

由于 Taurus.DTS 分布式任务发布组件的独立发布,原有发布的 Taurus.DTC 分布式事务组件,下一版本会移除掉其基础的任务发布订阅功能,保留事务的纯洁属性。
今天发布的 Taurus.DTS 组件,为 .Net 和  .Net Core 微服务系列又又贡献了一个新的组件。
后续会发布分布式锁的教程,这个已经在 CYQ.Data 里实现了,并且在也在 Taurus.DTC 和 Taurus.DTS 中使用到了。

来源:https://www.cnblogs.com/cyq1162/p/17971549
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具