|
至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用
技术栈用到了:BackgroundService和NCrontab库
第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:- public interface IScheduleTask
- {
- Task ExecuteAsync();
- }
- public abstract class ScheduleTask : IScheduleTask
- {
- public virtual Task ExecuteAsync()
- {
- return Task.CompletedTask;
- }
- }
复制代码 第二步定义特性标注任务执行周期等信的metadata- [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
- public class ScheduleTaskAttribute(string cron) : Attribute
- {
- /// <summary>
- /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
- /// 最小单位为分钟
- /// </summary>
- public string Cron { get; set; } = cron;
- public string? Description { get; set; }
- /// <summary>
- /// 是否异步执行.默认false会阻塞接下来的同类任务
- /// </summary>
- public bool IsAsync { get; set; } = false;
- /// <summary>
- /// 是否初始化即启动,默认false
- /// </summary>
- public bool IsStartOnInit { get; set; } = false;
- }
复制代码 第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:- public interface IScheduler
- {
- /// <summary>
- /// 判断当前的任务是否可以执行
- /// </summary>
- bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
- }
复制代码 好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:- public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
- {
- public Type ScheduleTaskType { get; set; } = scheduleTaskType;
- public string Cron { get; set; } = cron;
- public string? Description { get; set; }
- public bool IsAsync { get; set; } = false;
- public bool IsStartOnInit { get; set; } = false;
- }
- public interface IScheduleMetadataStore
- {
- /// <summary>
- /// 获取所有ScheduleTaskMetadata
- /// </summary>
- Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
- }
复制代码 实现一个Configuration级别的Store- internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
- {
- const string Key = "BiwenQuickApi:Schedules";
- public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
- {
- var options = configuration.GetSection(Key).GetChildren();
- if (options?.Any() is true)
- {
- var metadatas = options.Select(x =>
- {
- var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
- if (type is null)
- throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");
- return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
- {
- Description = x[nameof(ConfigurationScheduleOption.Description)],
- IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
- IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
- };
- });
- return Task.FromResult(metadatas);
- }
- return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());
- }
- }
复制代码 然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088- public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
- {
- /// <summary>
- /// 任务
- /// </summary>
- public IScheduleTask ScheduleTask { get; set; } = scheduleTask;
- /// <summary>
- /// 触发时间
- /// </summary>
- public DateTime EventTime { get; set; } = eventTime;
- }
- /// <summary>
- /// 执行完成
- /// </summary>
- public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
- {
- /// <summary>
- /// 执行结束的时间
- /// </summary>
- public DateTime EndTime { get; set; } = endTime;
- }
- /// <summary>
- /// 执行开始
- /// </summary>
- public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
- /// <summary>
- /// 执行失败
- /// </summary>
- public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
- {
- /// <summary>
- /// 异常信息
- /// </summary>
- public Exception Exception { get; private set; } = exception;
- }
复制代码 接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:- internal class SampleNCrontabScheduler : IScheduler
- {
- /// <summary>
- /// 暂存上次执行时间
- /// </summary>
- private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();
- public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
- {
- var now = DateTime.Now;
- var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);
- if (!haveExcuteTime)
- {
- var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
- LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);
- //如果不是初始化启动,则不执行
- if (!scheduleMetadata.IsStartOnInit)
- return false;
- }
- if (now >= time)
- {
- var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
- //更新下次执行时间
- LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
- return true;
- }
- return false;
- }
- }
复制代码 然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:- internal class ScheduleBackgroundService : BackgroundService
- {
- private static readonly TimeSpan _pollingTime
- #if DEBUG
- //轮询20s 测试环境下,方便测试。
- = TimeSpan.FromSeconds(20);
- #endif
- #if !DEBUG
- //轮询60s 正式环境下,考虑性能轮询时间延长到60s
- = TimeSpan.FromSeconds(60);
- #endif
- //心跳10s.
- private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
- private readonly ILogger<ScheduleBackgroundService> _logger;
- private readonly IServiceProvider _serviceProvider;
- public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)
- {
- _logger = logger;
- _serviceProvider = serviceProvider;
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- while (!stoppingToken.IsCancellationRequested)
- {
- var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
- try
- {
- await RunAsync(stoppingToken);
- }
- catch (Exception ex)
- {
- //todo:
- _logger.LogError(ex.Message);
- }
- await WaitAsync(pollingDelay, stoppingToken);
- }
- }
- private async Task RunAsync(CancellationToken stoppingToken)
- {
- using var scope = _serviceProvider.CreateScope();
- var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();
- if (tasks is null || !tasks.Any())
- {
- return;
- }
- //调度器
- var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
- async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
- {
- if (scheduler.CanRun(metadata, DateTime.Now))
- {
- var eventTime = DateTime.Now;
- //通知启动
- _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
- try
- {
- if (metadata.IsAsync)
- {
- //异步执行
- _ = task.ExecuteAsync();
- }
- else
- {
- //同步执行
- await task.ExecuteAsync();
- }
- //执行完成
- _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
- }
- catch (Exception ex)
- {
- _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
- }
- }
- };
- //注解中的task
- foreach (var task in tasks)
- {
- if (stoppingToken.IsCancellationRequested)
- {
- break;
- }
- //标注的metadatas
- var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();
- if (!metadatas.Any())
- {
- continue;
- }
- foreach (var metadata in metadatas)
- {
- await DoTaskAsync(task, metadata);
- }
- }
- //store中的scheduler
- var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();
- //并行执行,提高性能
- Parallel.ForEach(stores, async store =>
- {
- if (stoppingToken.IsCancellationRequested)
- {
- return;
- }
- var metadatas = await store.GetAllAsync();
- if (metadatas is null || !metadatas.Any())
- {
- return;
- }
- foreach (var metadata in metadatas)
- {
- var attr = new ScheduleTaskAttribute(metadata.Cron)
- {
- Description = metadata.Description,
- IsAsync = metadata.IsAsync,
- IsStartOnInit = metadata.IsStartOnInit,
- };
- var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
- if (task is null)
- {
- return;
- }
- await DoTaskAsync(task, attr);
- }
- });
- }
- private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
- {
- try
- {
- await Task.Delay(_minIdleTime, stoppingToken);
- await pollingDelay;
- }
- catch (OperationCanceledException)
- {
- }
- }
- }
复制代码 最后收尾阶段我们老规矩扩展一下IServiceCollection:- internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
- {
- foreach (var task in ScheduleTasks)
- {
- services.AddTransient(task);
- services.AddTransient(typeof(IScheduleTask), task);
- }
- //调度器
- services.AddScheduler<SampleNCrontabScheduler>();
- //配置文件Store:
- services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();
- //BackgroundService
- services.AddHostedService<ScheduleBackgroundService>();
- return services;
- }
- /// <summary>
- /// 注册调度器AddScheduler
- /// </summary>
- public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
- {
- services.AddSingleton<IScheduler, T>();
- return services;
- }
- /// <summary>
- /// 注册ScheduleMetadataStore
- /// </summary>
- public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
- {
- services.AddSingleton<IScheduleMetadataStore, T>();
- return services;
- }
复制代码 老规矩我们来测试一下:- //通过特性标注的方式执行:
- [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
- [ScheduleTask("0/3 * * * *")]//每3分钟执行一次
- public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
- {
- public async Task ExecuteAsync()
- {
- //执行5s
- await Task.Delay(TimeSpan.FromSeconds(5));
- logger.LogInformation("keep alive!");
- }
- }
- public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
- {
- public Task ExecuteAsync()
- {
- logger.LogInformation("Demo Config Schedule Done!");
- return Task.CompletedTask;
- }
- }
复制代码 通过配置文件的方式配置Store:- {
- "BiwenQuickApi": {
- "Schedules": [
- {
- "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
- "Cron": "0/5 * * * *",
- "Description": "Every 5 mins",
- "IsAsync": true,
- "IsStartOnInit": false
- },
- {
- "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
- "Cron": "0/10 * * * *",
- "Description": "Every 10 mins",
- "IsAsync": false,
- "IsStartOnInit": true
- }
- ]
- }
- }
复制代码 我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:- public class DemoStore : IScheduleMetadataStore
- {
- public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
- {
- //模拟从数据库或配置文件中获取ScheduleTaskMetadata
- IEnumerable<ScheduleTaskMetadata> metadatas =
- [
- new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
- {
- Description="测试的Schedule"
- },
- ];
- return Task.FromResult(metadatas);
- }
- }
- //然后注册这个Store:
- builder.Services.AddScheduleMetadataStore<DemoStore>();
复制代码 所有的一切都大功告成,最后我们来跑一下Demo,成功了:
当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!
源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling
来源:https://www.cnblogs.com/vipwan/p/18194062
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|