翼度科技»论坛 编程开发 .net 查看内容

扩展ABP的Webhook功能,推送数据到第三方接口(企业微信群、钉钉群等)

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
前言

在上一篇文章【基于ASP.NET ZERO,开发SaaS版供应链管理系统】中有提到对Webhook功能的扩展改造,本文详细介绍一下具体过程。

Webhook功能操作说明,请参见此文档链接:Webhook数据推送

Webhook功能发布日期:


  • ASP.NET Boilerplate(以下简称ABP)在v5.2(2020-02-18)版本中发布了Webhook功能,详细说明,请参见:官方帮助链接
  • ASP.NET ZERO(以下简称ZERO)在v8.2.0(2020-02-20)版本中发布了Webhook功能;
  • 我们系统是在2021年4月完成了对Webhook功能的改造:内部接口(用户自行设定接口地址的)、第三方接口(微信内部群、钉钉群、聚水潭API等)。
1、Webhook定义


  • 为了区分内部接口与第三方接口,在第三方接口名称前统一附加特定前缀,如:Third.WX.XXX、Third.DD.XXX等;
  • 添加定义条目时候设定对应的特性(featureDependency),基于特性功能对不同租户显示或者隐藏定义的条目。
  1.     public class AppWebhookDefinitionProvider : WebhookDefinitionProvider
  2.     {
  3.         public override void SetWebhooks(IWebhookDefinitionContext context)
  4.         {
  5.             //物料档案 - 全部可见
  6.             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Created));
  7.             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Updated));
  8.             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Deleted));
  9.             //生产订单 - 生产管理可见
  10.             var featureC = new SimpleFeatureDependency("SCM.C");
  11.             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Created, featureDependency: featureC));
  12.             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Updated, featureDependency: featureC));
  13.             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Deleted, featureDependency: featureC));
  14.             context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_MRP_Data, featureDependency: featureC));
  15.             //...
  16.         }
  17.     }
复制代码

  • CoreModule中添加Webhook定义,并设定参数选项:
  1.     public class SCMCoreModule : AbpModule
  2.     {
  3.         public override void PreInitialize()
  4.         {
  5.             Configuration.Webhooks.Providers.Add<AppWebhookDefinitionProvider>();
  6.             Configuration.Webhooks.TimeoutDuration = TimeSpan.FromMinutes(1);
  7.             Configuration.Webhooks.IsAutomaticSubscriptionDeactivationEnabled = true;
  8.             Configuration.Webhooks.MaxSendAttemptCount = 3;
  9.             Configuration.Webhooks.MaxConsecutiveFailCountBeforeDeactivateSubscription = 10;
  10.             //...
  11.         }
  12.         //...
  13.     }
复制代码
2、Webhook订阅


  • 前端用户创建Webhook订阅记录(WebhookUri、Webhooks、Headers等),之后传递到后端API;
  • 后端API通过WebhookSubscriptionManager添加保存WebhookSubscription(Webhook订阅):
  1.     [AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription)]
  2.     public class WebhookSubscriptionAppService : SCMAppServiceBase, IWebhookSubscriptionAppService
  3.     {
  4.         //...
  5.         [AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription_Create)]
  6.         public async Task AddSubscription(WebhookSubscription subscription)
  7.         {
  8.             subscription.TenantId = AbpSession.TenantId;
  9.             await _webHookSubscriptionManager.AddOrUpdateSubscriptionAsync(subscription);
  10.         }
  11.         //...
  12.     }
复制代码
3、Webhook发布(数据推送)

监测实体事件(CreatedEvent、UpdatedEvent、DeletedEvent)数据,按租户用户创建的Webhook订阅,推送数据:
  1.     public class T11071001Syncronizer :
  2.         IEventHandler<EntityCreatedEventData<T11071001>>,
  3.         IEventHandler<EntityUpdatedEventData<T11071001>>,
  4.         IEventHandler<EntityDeletedEventData<T11071001>>,
  5.         ITransientDependency
  6.     {
  7.         private readonly IAppWebhookPublisher _appWebhookPublisher;
  8.         public T11071001Syncronizer(IAppWebhookPublisher appWebhookPublisher)
  9.         {
  10.             _appWebhookPublisher = appWebhookPublisher;
  11.         }
  12.         public void HandleEvent(EntityCreatedEventData<T11071001> eventData)
  13.         {
  14.             DoWebhook("N", eventData.Entity);
  15.         }
  16.         public void HandleEvent(EntityUpdatedEventData<T11071001> eventData)
  17.         {
  18.             DoWebhook("U", eventData.Entity);
  19.         }
  20.         public void HandleEvent(EntityDeletedEventData<T11071001> eventData)
  21.         {
  22.             int? tenantId = eventData.Entity.TenantId;
  23.             string whName = AppWebHookNames.T11071001_Deleted;
  24.             var subscriptions = _appWebhookPublisher.GetSubscriptions(tenantId, whName);
  25.             if (subscriptions == null) { return; }
  26.             _appWebhookPublisher.PublishWebhookUOW(whName, eventData.Entity, tenantId, subscriptions);
  27.         }
  28.     }
复制代码

  • DoWebhook()方法:基于具体的订阅(内部接口、第三方接口等)推送对应的内容:
  1.         private void DoWebhook(string nu, T11071001 entity)
  2.         {
  3.             int? tenantId = entity.TenantId;
  4.             var whCache = _appWebhookPublisher.GetWebhookCache(tenantId); if (whCache.Count == 0) { return; }
  5.             string whName = nu == "N" ? AppWebHookNames.T11071001_Created : AppWebHookNames.T11071001_Updated;
  6.             string whNameWX = AppWebHookNames.WX_T11071001_Created;
  7.             string whNameDD = AppWebHookNames.DD_T11071001_Created;
  8.             bool isWH = whCache.Names.ContainsKey(whName);
  9.             bool isWX = whCache.Names.ContainsKey(whNameWX);
  10.             bool isDD = whCache.Names.ContainsKey(whNameDD);
  11.             if (!(isWH || isWX || isDD)) { return; }
  12.             var data = ObjectMapper.Map<T11071001WebhookDto>(entity);
  13.             //内部接口
  14.             if (isWH)
  15.             {
  16.                 _appWebhookPublisher.PublishWebhookUOW(whName, data, tenantId, whCache.Names[whName], false);
  17.             }
  18.             //企业微信内部群
  19.             if (isWX)
  20.             {
  21.                 var wxData = new WxTCardWebhookDto { template_card = GetWxTCard(data, tenantId, nu) };
  22.                 _appWebhookPublisher.PublishWebhookUOW(whNameWX, wxData, tenantId, whCache.Names[whNameWX], true);
  23.             }
  24.             //钉钉内部群
  25.             if (isDD)
  26.             {
  27.                 var title = GetNUTitle(nu, L(T));
  28.                 var mdText = GetNewMarkdown(data, title);
  29.                 var ddData = new DdMarkdownWebhookDto { markdown = new DdMarkdownContentDto { title = title, text = mdText } };
  30.                 _appWebhookPublisher.PublishWebhookUOW(whNameDD, ddData, tenantId, whCache.Names[whNameDD], true);
  31.             }
  32.         }
复制代码

  • GetWebhookCache()方法:实现按租户缓存Webhook订阅的数据:
  1.         public SCMWebhookCacheItem GetWebhookCache(int? tenantId)
  2.         {
  3.            return SetAndGetCache(tenantId);
  4.         }
  5.         private SCMWebhookCacheItem SetAndGetCache(int? tenantId, string keyName = "SubscriptionCount")
  6.         {
  7.            int tid = tenantId ?? 0; var cacheKey = $"{keyName}-{tid}";
  8.            return _cacheManager.GetSCMWebhookCache().Get(cacheKey, () =>
  9.            {
  10.                 int count = 0;
  11.                 var names = new Dictionary<string, List<WebhookSubscription>>();
  12.                 UnitOfWorkManager.WithUnitOfWork(() =>
  13.                 {
  14.                     using (UnitOfWorkManager.Current.SetTenantId(tenantId))
  15.                     {
  16.                         if (_featureChecker.IsEnabled(tid, "SCM.H")) //Feature核查
  17.                         {
  18.                             var items = _webhookSubscriptionRepository.GetAllList(e => e.TenantId == tenantId && e.IsActive == true);
  19.                             count = items.Count;
  20.                             foreach (var item in items)
  21.                             {
  22.                                 if (string.IsNullOrWhiteSpace(item.Webhooks)) { continue; }
  23.                                 var whNames = JsonHelper.DeserializeObject<string[]>(item.Webhooks); if (whNames == null) { continue; }
  24.                                 foreach (string whName in whNames)
  25.                                 {
  26.                                     if (names.ContainsKey(whName))
  27.                                     {
  28.                                         names[whName].Add(item.ToWebhookSubscription());
  29.                                     }
  30.                                     else
  31.                                     {
  32.                                         names.Add(whName, new List<WebhookSubscription> { item.ToWebhookSubscription() });
  33.                                     }
  34.                                 }
  35.                             }
  36.                         }
  37.                     }
  38.                 });
  39.                 return new SCMWebhookCacheItem(count, names);
  40.             });
  41.         }
复制代码

  • PublishWebhookUOW()方法:替换ABP中WebHookPublisher的默认实现,直接按传入的订阅,通过WebhookSenderJob推送数据:
  1.         public void PublishWebhookUOW(string webHookName, object data, int? tenantId, List<WebhookSubscription> webhookSubscriptions = null, bool sendExactSameData = false)
  2.         {
  3.             UnitOfWorkManager.WithUnitOfWork(() =>
  4.             {
  5.                 using (UnitOfWorkManager.Current.SetTenantId(tenantId))   
  6.                 {
  7.                     Publish(webHookName, data, tenantId, webhookSubscriptions, sendExactSameData);
  8.                 }
  9.             });
  10.         }
  11.         private void Publish(string webhookName, object data, int? tenantId, List<WebhookSubscription> webhookSubscriptions, bool sendExactSameData = false)
  12.         {
  13.             if (string.IsNullOrWhiteSpace(webhookName)) { return; }
  14.             //若无直接传入订阅则按webhookName查询
  15.             webhookSubscriptions ??= _webhookSubscriptionRepository.GetAllList(subscriptionInfo =>
  16.                     subscriptionInfo.TenantId == tenantId &&
  17.                     subscriptionInfo.IsActive &&
  18.                     subscriptionInfo.Webhooks.Contains(""" + webhookName + """)
  19.                 ).Select(subscriptionInfo => subscriptionInfo.ToWebhookSubscription()).ToList();
  20.             if (webhookSubscriptions.IsNullOrEmpty()) { return; }
  21.             var webhookInfo = SaveAndGetWebhookEvent(tenantId, webhookName, data);
  22.             foreach (var webhookSubscription in webhookSubscriptions)
  23.             {
  24.                 var jobArgs = new WebhookSenderArgs
  25.                 {
  26.                     TenantId = webhookSubscription.TenantId,
  27.                     WebhookEventId = webhookInfo.Id,
  28.                     Data = webhookInfo.Data,
  29.                     WebhookName = webhookInfo.WebhookName,
  30.                     WebhookSubscriptionId = webhookSubscription.Id,
  31.                     Headers = webhookSubscription.Headers,
  32.                     Secret = webhookSubscription.Secret,
  33.                     WebhookUri = webhookSubscription.WebhookUri,
  34.                     SendExactSameData = sendExactSameData
  35.                 };
  36.                 //指定队列执行任务,由触发事件的server执行
  37.                 IBackgroundJobClient hangFireClient = new BackgroundJobClient();
  38.                 hangFireClient.Create<WebhookSenderJob>(x => x.ExecuteAsync(jobArgs), new EnqueuedState(AppVersionHelper.MachineName));
  39.             }
  40.         }
复制代码

  • WebhookSenderJob:重写WebhookManager的SignWebhookRequest方法,对于第三方接口,不添加签名的Header:
  1.         public override void SignWebhookRequest(HttpRequestMessage request, string serializedBody, string secret)
  2.         {
  3.             if (request == null)
  4.             {
  5.                 throw new ArgumentNullException(nameof(request));
  6.             }
  7.             //第三方接口,不添加签名Header
  8.             if (IsThirdAPI(request))
  9.             {
  10.                 return;
  11.             }
  12.             if (string.IsNullOrWhiteSpace(serializedBody))
  13.             {
  14.                 throw new ArgumentNullException(nameof(serializedBody));
  15.             }
  16.             var secretBytes = Encoding.UTF8.GetBytes(secret);
  17.             using (var hasher = new HMACSHA256(secretBytes))
  18.             {
  19.                 request.Content = new StringContent(serializedBody, Encoding.UTF8, "application/json");
  20.                 var data = Encoding.UTF8.GetBytes(serializedBody);
  21.                 var sha256 = hasher.ComputeHash(data);
  22.                 var headerValue = string.Format(CultureInfo.InvariantCulture, SignatureHeaderValueTemplate, BitConverter.ToString(sha256));
  23.                 request.Headers.Add(SignatureHeaderName, headerValue);
  24.             }
  25.         }
复制代码

  • WebhookSenderJob:重写WebhookSender的CreateWebhookRequestMessage方法,对于第三方接口,进行特殊处理:
  1.         protected override HttpRequestMessage CreateWebhookRequestMessage(WebhookSenderArgs webhookSenderArgs)
  2.         {
  3.             return webhookSenderArgs.WebhookName switch
  4.             {
  5.                 AppWebHookNames.JST_supplier_upload => JSTHttpRequestMessage(webhookSenderArgs), //聚水潭 - 供应商上传
  6.                 //...
  7.                 _ => new HttpRequestMessage(HttpMethod.Post, webhookSenderArgs.WebhookUri)
  8.             };
  9.         }
复制代码

  • WebhookSenderJob:重写WebhookSender的AddAdditionalHeaders方法, 对于第三方接口,不添加Headers:
  1.         protected override void AddAdditionalHeaders(HttpRequestMessage request, WebhookSenderArgs webhookSenderArgs)
  2.         {
  3.             //第三方接口,不添加Header
  4.             if (IsThirdAPI(request))
  5.             {
  6.                 return;
  7.             }
  8.             foreach (var header in webhookSenderArgs.Headers)
  9.             {
  10.                 if (request.Headers.TryAddWithoutValidation(header.Key, header.Value))
  11.                 {
  12.                     continue;
  13.                 }
  14.                 if (request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value))
  15.                 {
  16.                     continue;
  17.                 }
  18.                 throw new Exception($"Invalid Header. SubscriptionId:{webhookSenderArgs.WebhookSubscriptionId},Header: {header.Key}:{header.Value}");
  19.             }
  20.         }
复制代码

  • WebhookSenderJob:重写WebhookSender的SendHttpRequest方法,处理第三方接口的回传数据:
  1.         protected override async Task<(bool isSucceed, HttpStatusCode statusCode, string content)> SendHttpRequest(HttpRequestMessage request)
  2.         {
  3.             using var client = _httpClientFactory.CreateClient(); //避免使用 new HttpClient()方式
  4.             client.Timeout = _webhooksConfiguration.TimeoutDuration;
  5.             var response = await client.SendAsync(request);
  6.             var isSucceed = response.IsSuccessStatusCode;
  7.             var statusCode = response.StatusCode;
  8.             var content = await response.Content.ReadAsStringAsync();
  9.             //第三方接口,需要处理回传的数据     
  10.             if (IsThirdAPI(request))
  11.             {
  12.                 string method = TryGetHeader(request.Headers, "ThirdAPI1");
  13.                 int tenantId = Convert.ToInt32(TryGetHeader(request.Headers, "ThirdAPI2"));
  14.                 switch (method)
  15.                 {
  16.                     case AppWebHookNames.JST_supplier_upload: await JSTSupplierUploadResponse(method, content, tenantId); break;
  17.                     //...
  18.                     default: break;
  19.                 }
  20.             }
  21.             return (isSucceed, statusCode, content);
  22.         }
复制代码
总结

基于ABP/ZERO的Webhook功能实现,进行一些扩展改造,可以实现业务数据按用户订阅进行推送,包括推送到第三方接口(企业微信群、钉钉等),在很大程度上提升了业务系统的灵活性与实用性。

来源:https://www.cnblogs.com/freedyang/archive/2023/09/08/17684464.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具