翼度科技»论坛 编程开发 .net 查看内容

NETCore中实现一个轻量无负担的极简任务调度ScheduleTask

9

主题

9

帖子

27

积分

新手上路

Rank: 1

积分
27
至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用
技术栈用到了:BackgroundService和NCrontab库
第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:
  1.     public interface IScheduleTask
  2.     {
  3.         Task ExecuteAsync();
  4.     }
  5.     public abstract class ScheduleTask : IScheduleTask
  6.     {
  7.         public virtual Task ExecuteAsync()
  8.         {
  9.             return Task.CompletedTask;
  10.         }
  11.     }
复制代码
第二步定义特性标注任务执行周期等信的metadata
  1.     [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
  2.     public class ScheduleTaskAttribute(string cron) : Attribute
  3.     {
  4.         /// <summary>
  5.         /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
  6.         /// 最小单位为分钟
  7.         /// </summary>
  8.         public string Cron { get; set; } = cron;
  9.         public string? Description { get; set; }
  10.         /// <summary>
  11.         /// 是否异步执行.默认false会阻塞接下来的同类任务
  12.         /// </summary>
  13.         public bool IsAsync { get; set; } = false;
  14.         /// <summary>
  15.         /// 是否初始化即启动,默认false
  16.         /// </summary>
  17.         public bool IsStartOnInit { get; set; } = false;
  18.     }
复制代码
第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:
  1.     public interface IScheduler
  2.     {
  3.         /// <summary>
  4.         /// 判断当前的任务是否可以执行
  5.         /// </summary>
  6.         bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
  7.     }
复制代码
好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:
  1.     public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
  2.     {
  3.         public Type ScheduleTaskType { get; set; } = scheduleTaskType;
  4.         public string Cron { get; set; } = cron;
  5.         public string? Description { get; set; }
  6.         public bool IsAsync { get; set; } = false;
  7.         public bool IsStartOnInit { get; set; } = false;
  8.     }
  9.     public interface IScheduleMetadataStore
  10.     {
  11.         /// <summary>
  12.         /// 获取所有ScheduleTaskMetadata
  13.         /// </summary>
  14.         Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
  15.     }
复制代码
实现一个Configuration级别的Store
  1.     internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
  2.     {
  3.         const string Key = "BiwenQuickApi:Schedules";
  4.         public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
  5.         {
  6.             var options = configuration.GetSection(Key).GetChildren();
  7.             if (options?.Any() is true)
  8.             {
  9.                 var metadatas = options.Select(x =>
  10.                 {
  11.                     var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
  12.                     if (type is null)
  13.                         throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");
  14.                     return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
  15.                     {
  16.                         Description = x[nameof(ConfigurationScheduleOption.Description)],
  17.                         IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
  18.                         IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
  19.                     };
  20.                 });
  21.                 return Task.FromResult(metadatas);
  22.             }
  23.             return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());
  24.         }
  25.     }
复制代码
然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088
  1.     public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
  2.     {
  3.         /// <summary>
  4.         /// 任务
  5.         /// </summary>
  6.         public IScheduleTask ScheduleTask { get; set; } = scheduleTask;
  7.         /// <summary>
  8.         /// 触发时间
  9.         /// </summary>
  10.         public DateTime EventTime { get; set; } = eventTime;
  11.     }
  12.     /// <summary>
  13.     /// 执行完成
  14.     /// </summary>
  15.     public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
  16.     {
  17.         /// <summary>
  18.         /// 执行结束的时间
  19.         /// </summary>
  20.         public DateTime EndTime { get; set; } = endTime;
  21.     }
  22.     /// <summary>
  23.     /// 执行开始
  24.     /// </summary>
  25.     public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
  26.     /// <summary>
  27.     /// 执行失败
  28.     /// </summary>
  29.     public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
  30.     {
  31.         /// <summary>
  32.         /// 异常信息
  33.         /// </summary>
  34.         public Exception Exception { get; private set; } = exception;
  35.     }
复制代码
接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:
  1.     internal class SampleNCrontabScheduler : IScheduler
  2.     {
  3.         /// <summary>
  4.         /// 暂存上次执行时间
  5.         /// </summary>
  6.         private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();
  7.         public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
  8.         {
  9.             var now = DateTime.Now;
  10.             var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);
  11.             if (!haveExcuteTime)
  12.             {
  13.                 var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
  14.                 LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);
  15.                 //如果不是初始化启动,则不执行
  16.                 if (!scheduleMetadata.IsStartOnInit)
  17.                     return false;
  18.             }
  19.             if (now >= time)
  20.             {
  21.                 var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
  22.                 //更新下次执行时间
  23.                 LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
  24.                 return true;
  25.             }
  26.             return false;
  27.         }
  28.     }
复制代码
然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:
  1.     internal class ScheduleBackgroundService : BackgroundService
  2.     {
  3.         private static readonly TimeSpan _pollingTime
  4. #if DEBUG
  5.           //轮询20s 测试环境下,方便测试。
  6.           = TimeSpan.FromSeconds(20);
  7. #endif
  8. #if !DEBUG
  9.          //轮询60s 正式环境下,考虑性能轮询时间延长到60s
  10.          = TimeSpan.FromSeconds(60);
  11. #endif
  12.         //心跳10s.
  13.         private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
  14.         private readonly ILogger<ScheduleBackgroundService> _logger;
  15.         private readonly IServiceProvider _serviceProvider;
  16.         public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)
  17.         {
  18.             _logger = logger;
  19.             _serviceProvider = serviceProvider;
  20.         }
  21.         protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  22.         {
  23.             while (!stoppingToken.IsCancellationRequested)
  24.             {
  25.                 var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
  26.                 try
  27.                 {
  28.                     await RunAsync(stoppingToken);
  29.                 }
  30.                 catch (Exception ex)
  31.                 {
  32.                     //todo:
  33.                     _logger.LogError(ex.Message);
  34.                 }
  35.                 await WaitAsync(pollingDelay, stoppingToken);
  36.             }
  37.         }
  38.         private async Task RunAsync(CancellationToken stoppingToken)
  39.         {
  40.             using var scope = _serviceProvider.CreateScope();
  41.             var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();
  42.             if (tasks is null || !tasks.Any())
  43.             {
  44.                 return;
  45.             }
  46.             //调度器
  47.             var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
  48.             async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
  49.             {
  50.                 if (scheduler.CanRun(metadata, DateTime.Now))
  51.                 {
  52.                     var eventTime = DateTime.Now;
  53.                     //通知启动
  54.                     _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
  55.                     try
  56.                     {
  57.                         if (metadata.IsAsync)
  58.                         {
  59.                             //异步执行
  60.                             _ = task.ExecuteAsync();
  61.                         }
  62.                         else
  63.                         {
  64.                             //同步执行
  65.                             await task.ExecuteAsync();
  66.                         }
  67.                         //执行完成
  68.                         _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
  69.                     }
  70.                     catch (Exception ex)
  71.                     {
  72.                         _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
  73.                     }
  74.                 }
  75.             };
  76.             //注解中的task
  77.             foreach (var task in tasks)
  78.             {
  79.                 if (stoppingToken.IsCancellationRequested)
  80.                 {
  81.                     break;
  82.                 }
  83.                 //标注的metadatas
  84.                 var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();
  85.                 if (!metadatas.Any())
  86.                 {
  87.                     continue;
  88.                 }
  89.                 foreach (var metadata in metadatas)
  90.                 {
  91.                     await DoTaskAsync(task, metadata);
  92.                 }
  93.             }
  94.             //store中的scheduler
  95.             var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();
  96.             //并行执行,提高性能
  97.             Parallel.ForEach(stores, async store =>
  98.             {
  99.                 if (stoppingToken.IsCancellationRequested)
  100.                 {
  101.                     return;
  102.                 }
  103.                 var metadatas = await store.GetAllAsync();
  104.                 if (metadatas is null || !metadatas.Any())
  105.                 {
  106.                     return;
  107.                 }
  108.                 foreach (var metadata in metadatas)
  109.                 {
  110.                     var attr = new ScheduleTaskAttribute(metadata.Cron)
  111.                     {
  112.                         Description = metadata.Description,
  113.                         IsAsync = metadata.IsAsync,
  114.                         IsStartOnInit = metadata.IsStartOnInit,
  115.                     };
  116.                     var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
  117.                     if (task is null)
  118.                     {
  119.                         return;
  120.                     }
  121.                     await DoTaskAsync(task, attr);
  122.                 }
  123.             });
  124.         }
  125.         private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
  126.         {
  127.             try
  128.             {
  129.                 await Task.Delay(_minIdleTime, stoppingToken);
  130.                 await pollingDelay;
  131.             }
  132.             catch (OperationCanceledException)
  133.             {
  134.             }
  135.         }
  136.     }
复制代码
最后收尾阶段我们老规矩扩展一下IServiceCollection:
  1.         internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
  2.         {
  3.             foreach (var task in ScheduleTasks)
  4.             {
  5.                 services.AddTransient(task);
  6.                 services.AddTransient(typeof(IScheduleTask), task);
  7.             }
  8.             //调度器
  9.             services.AddScheduler<SampleNCrontabScheduler>();
  10.             //配置文件Store:
  11.         services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();
  12.             //BackgroundService
  13.            services.AddHostedService<ScheduleBackgroundService>();
  14.             return services;
  15.         }
  16.         /// <summary>
  17.         /// 注册调度器AddScheduler
  18.         /// </summary>
  19.         public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
  20.         {
  21.             services.AddSingleton<IScheduler, T>();
  22.             return services;
  23.         }
  24.         /// <summary>
  25.         /// 注册ScheduleMetadataStore
  26.         /// </summary>
  27.         public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
  28.         {
  29.             services.AddSingleton<IScheduleMetadataStore, T>();
  30.             return services;
  31.         }
复制代码
老规矩我们来测试一下:
  1.     //通过特性标注的方式执行:
  2.     [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
  3.     [ScheduleTask("0/3 * * * *")]//每3分钟执行一次
  4.     public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
  5.     {
  6.         public async Task ExecuteAsync()
  7.         {
  8.             //执行5s
  9.             await Task.Delay(TimeSpan.FromSeconds(5));
  10.             logger.LogInformation("keep alive!");
  11.         }
  12.     }
  13.         public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
  14.     {
  15.         public Task ExecuteAsync()
  16.         {
  17.             logger.LogInformation("Demo Config Schedule Done!");
  18.             return Task.CompletedTask;
  19.         }
  20.     }
复制代码
通过配置文件的方式配置Store:
  1. {
  2.   "BiwenQuickApi": {
  3.     "Schedules": [
  4.       {
  5.         "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
  6.         "Cron": "0/5 * * * *",
  7.         "Description": "Every 5 mins",
  8.         "IsAsync": true,
  9.         "IsStartOnInit": false
  10.       },
  11.       {
  12.         "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
  13.         "Cron": "0/10 * * * *",
  14.         "Description": "Every 10 mins",
  15.         "IsAsync": false,
  16.         "IsStartOnInit": true
  17.       }
  18.     ]
  19.   }
  20. }
复制代码
我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:
  1.     public class DemoStore : IScheduleMetadataStore
  2.     {
  3.         public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
  4.         {
  5.             //模拟从数据库或配置文件中获取ScheduleTaskMetadata
  6.             IEnumerable<ScheduleTaskMetadata> metadatas =
  7.                 [
  8.                     new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
  9.                     {
  10.                         Description="测试的Schedule"
  11.                     },
  12.                 ];
  13.             return Task.FromResult(metadatas);
  14.         }
  15.     }
  16.         //然后注册这个Store:
  17.         builder.Services.AddScheduleMetadataStore<DemoStore>();
复制代码
所有的一切都大功告成,最后我们来跑一下Demo,成功了:

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!
2024/05/16更新:
提供同一时间单一运行中的任务实现
  1. /// <summary>
  2. /// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟.
  3. /// </summary>
  4. /// <param name="logger"></param>
  5. [ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]
  6.     public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask
  7.     {
  8.         public override Task OnAbort()
  9.         {
  10.             logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");
  11.             return Task.CompletedTask;
  12.         }
  13.         public override async Task ExecuteAsync()
  14.         {
  15.             var now = DateTime.Now;
  16.             //模拟一个耗时2分钟的任务
  17.             await Task.Delay(TimeSpan.FromMinutes(2));
  18.             logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");
  19.         }
  20.     }
复制代码
源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling

来源:https://www.cnblogs.com/vipwan/p/18194062/biwen-quickapi-scheduletask
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具