翼度科技»论坛 编程开发 .net 查看内容

SignalR+Hangfire 实现后台任务队列和实时通讯

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
SignalR+Hangfire 实现后台任务队列和实时通讯

1.简介:
SignalR是一个.NET的开源框架,SignalR可使用Web Socket, Server Sent Events 和 Long Polling作为底层传输方式实现服务端和客户端的实时数据交互。
Hangfire是一个.NET的开源后台任务框架 提供统一的编程模型,以可靠的方式处理后台任务
2.目的:
通过SignalR+Hangfire,我们可以实现一些需要较长时间处理的任务,并在完成及时的通知前端处理结果。
3.以下是我使用SignalR+Hangfire的开发需求:
在net6 webapi的情况下,前端是vue+ts,我现在有个需要就是,我写了一个接口,是对接stable diffusion webui 文生图的接口,前端第一个人请求,返回图没有问题,
但是,此时在生成图的过程中,第二个人请求,我希望加入到一个队列或者别的方式 ,把这个请求放着,我处理完第一个请求之后继续处理第二个,并且告诉用户,前面有多少个任务需要等待?
我的开发环境,后端是.net7 前端vue3.0,下面是对应安装和使用教程:
1.Hangfire使用

1.安装nuget包

由于我使用的mysql,对应包为Hangfire.MySqlStorage,大家根据自己的数据库选择安装对应的包
  1. [/code][size=5]2.添加Hangfire配置[/size]
  2. [indent]Hangfire的数据是存在数据库中的,所以在添加配置时候要使用对应的数据库连接字符串。同时,在UseHangfireServer时,我使用了自定义的队列名称,并将同时执行的任务数设置为1,以实现任务队列中的任务唯一,且任务依次执行。
  3. [/indent]在program.cs中添加以下配置
  4. [size=4]1.添加Hangfire[/size]
  5. [align=center][/align]
  6. 代码内容:
  7. [code]var connectionString = configuration.GetValue<string>("ConnStr");//数据库连接配置
  8. // Add Hangfire services.
  9. services.AddHangfire(config =>
  10. {
  11.     config.UseStorage(new MySqlStorage(connectionString, new MySqlStorageOptions
  12.     {
  13.         TablesPrefix = "hangfire_", // 指定表前缀
  14.         PrepareSchemaIfNecessary = true // 允许安装 MySQL 表格(如果不存在的话)
  15.         // 其他存储选项
  16.     }));
  17. });
复制代码
2.应用Hangfire


代码内容:
  1. // Use Hangfire server and dashboard.
  2. app.UseHangfireServer(new BackgroundJobServerOptions
  3. {
  4.     Queues = new[] { "default", "img-queue" },
  5.     WorkerCount = 1
  6. });
  7. app.UseHangfireDashboard();// 使用 Hangfire 控制面板
复制代码
3.数据库配置

配置完成,在使用时,数据库会生成Hangfire的工作表,如下:

4.Hangfire 控制面板

对应Hangfire 控制面板为 /hangfire
  1. http://localhost:5122/hangfire
复制代码
1.仪表盘


2.队列


5.代码中的应用

1.发起一个后台任务
  1. //添加后台任务
  2. BackgroundJob.Enqueue(() => BackServiceCreateImg(request));
复制代码
2.后台任务方法
  1. /// <summary>
  2. /// 后台任务生成图片(DisableConcurrentExecution 设置超时时间 Queue设置任务类型)
  3. /// </summary>
  4. /// <param name="request"></param>
  5. /// <returns></returns>
  6. [DisableConcurrentExecution(timeoutInSeconds: 180)]
  7. [Queue("img-queue")]
  8. public async Task BackServiceCreateImg(GraphGenerationRequest request)
  9. {
  10.     //...代码逻辑省略
  11. }
复制代码
3.查询队列等待任务数
  1. var queueLength = JobStorage.Current.GetMonitoringApi()
  2.                             .EnqueuedCount("img-queue");//指定的队列类型的队列等待任务数
复制代码
2.SignalR使用

1.后端SignalR使用

由于我使用的.net7,微软自带SignalR,我们使用时只需要添加引用
  1. using Microsoft.AspNetCore.SignalR;
复制代码
1.添加SignalR配置

在program.cs中添加以下配置
1.添加SignalR


代码内容:
  1. // SignalR
  2. services.AddSignalR();
复制代码
2.配置SignalR hub


代码内容:
  1. // SignalR hub
  2. app.MapHub<GraphGenerationHub>("/graphhub");
复制代码
2.创建SignalR hub类
  1. using Hangfire;
  2. using Microsoft.AspNetCore.Cors;
  3. using Microsoft.AspNetCore.SignalR;
  4. namespace ChatGptWebApi.Hubs
  5. {
  6.     [EnableCors("MyPolicy")]
  7.     public class GraphGenerationHub : Hub
  8.     {
  9.         public GraphGenerationHub()
  10.         {
  11.         }
  12.         public long GetWaitingCount()
  13.         {
  14.             return  JobStorage.Current.GetMonitoringApi()
  15.                 .EnqueuedCount("img-queue");
  16.         }
  17.     }
  18. }
复制代码
3.代码中的应用

1.依赖注入

通过依赖注入,在要使用的类中注入
  1. private readonly IHubContext<GraphGenerationHub> _hubContext;
复制代码
4.发送消息

向全体发送
  1. _hubContext.Clients.All.SendAsync("updateWaitingCount", "消息内容.....");
复制代码
向指定客户端发送
  1. _hubContext.Clients.Client(request.ConnectionId).SendAsync("updateImgUrl", $"生成图片失败:{ex.Message}");
复制代码
2.前端SignalR使用

前端我用的是VUE+TS
1.安装SignalR包

通过命令使用 pnpm 安装 @microsoft/signalr:
  1. pnpm install @microsoft/signalr
复制代码
2.页面中引用@microsoft/signalr
  1. import * as signalR from "@microsoft/signalr";
复制代码
3.创建一个useSignalR.ts

创建一个useSignalR.ts来专门处理SignalR消息,然后在需要用到的页面中引用即可。
代码内容:
  1. import { onUnmounted, ref } from 'vue';
  2. import { useMessage } from 'naive-ui'
  3. import { HubConnectionBuilder, HubConnection } from '@microsoft/signalr';
  4. export  function useSignalR(
  5.   hubUrl: string,
  6.   hubName: string
  7. ) {
  8.   const connection = ref<HubConnection | null>(null);
  9.   const waitingCount = ref(0);
  10.   const imgUrl = ref([]);
  11.   const ms = useMessage();
  12.   const start = async () => {
  13.     if (connection.value && connection.value.state === 'Connected') return;
  14.     connection.value = getConnection(hubUrl);
  15.     if (connection.value) {
  16.      // 连接 SignalR
  17.      connection.value.start()
  18.      .then(() => {
  19.        console.log('SignalR Connected.');
  20.        // 调用 GraphGenerationHub 的 GetWaitingCount 方法获取队列等待数
  21.        connection.value?.invoke('GetWaitingCount')
  22.          .then(count => {
  23.            console.log('Waiting Count:', count);
  24.            waitingCount.value = count;
  25.          });
  26.        // 注册 signalR 接收方法
  27.        connection.value?.on('updateWaitingCount', count => {
  28.          console.log('Waiting Count:', count);
  29.          waitingCount.value = count;
  30.        });
  31.        connection.value?.on('updateImgUrl', newImgUrl => {
  32.         console.log('Waiting imgUrl:', newImgUrl);
  33.         if(typeof newImgUrl === 'string'){
  34.           ms.error(newImgUrl);
  35.         }else{
  36.           ms.success('图片生成成功。');
  37.         imgUrl.value = newImgUrl;
  38.         }
  39.       });
  40.      })
  41.      .catch(error => {
  42.        console.log('SignalR Connection Error:', error);
  43.      });
  44.     }
  45.   };
  46.   
  47.   const stop = () => {
  48.     connection.value!.stop();
  49.     connection.value = null;
  50.   };
  51.   const getConnection = (
  52.     hubUrl: string
  53.   ): HubConnection => {
  54.     return new HubConnectionBuilder()
  55.       .withUrl(hubUrl)
  56.       .withAutomaticReconnect().build();
  57.   };
  58.   start();
  59.   onUnmounted(() => {
  60.     if (connection.value?.state === 'Connected') connection.value!.stop();
  61.   });
  62.   return {
  63.     connection,
  64.     waitingCount,
  65.     imgUrl,
  66.     start,
  67.     stop
  68.   };
  69. }
复制代码
4.页面中的使用

在需要使用signalR的页面引用useSignalR
  1. import {useSignalR} from '@/views/chat/hooks/useSignalR';
复制代码
  1. setup() {
  2. //signalR
  3. const { waitingCount,connection,imgUrl } = useSignalR(apiBaseUrl+'/graphhub');
  4. }
复制代码
3.案例:SignalR+Hangfire+StableDiffusionAPI 生成图片

Hangfire实现后台调用StableDiffusion web接口,然后通过SignalR将结果返回给前端。这样,对StableDiffusion web的性能要求很低。不会因为生成图片慢,导致http请求超时的情况。大大改善了前后端交互。
1.前端建立SignalR

入上述页面中使用介绍的一样,当添加了
  1. const { waitingCount,connection,imgUrl } = useSignalR(apiBaseUrl+'/graphhub');
复制代码
打开对应页面时,就创建了SignalR的连接了。
2.前端发起请求

前端的提交按钮对应的方法,使用的是axios发送http请求生成图片。
代码如下:
  1. const submit = async () => {
  2.         const params = {
  3.           Prompt: description.value,
  4.           connectionId:connection.value?.connectionId //SignalR的客户端连接ID
  5.         };
  6.       try {
  7.       //signalR
  8.       const response = await axios.post(apiUrl+'/GenerateGraph', params);
  9.       if(response.data.status ==='Fail'){
  10.         ms.error(response.data.message ?? 'error')
  11.       return
  12.       }
  13.       usedCount.value=response.data.data;
  14.       ms.success(response.data.message);
  15.     } catch (error) {
  16.       ms.error('报错拉!:'+error);
  17.     }
  18.     console.log("提交的参数:", params); // 在控制台输出提交的参数
  19.   };
复制代码
3.后端接口和实现

后端接口和实现方法完成定时任务的发起和signalR的消息推送
后端接口如下:
  1. /// <summary>
  2. /// signalR+hangfire生成图片
  3. /// </summary>
  4. /// <param name="request"></param>
  5. /// <returns></returns>
  6. [HttpPost]
  7. public async Task<ApiResult<int?>> GenerateGraph(GraphGenerationRequest request)
  8. {
  9.     var res=await _iGptImage.GenerateGraph(request);
  10.     return res;
  11. }
复制代码
方法实现:
  1. /// <summary>
  2. /// 生成图片,返回队列信息和剩余次数
  3. /// </summary>
  4. /// <param name="request"></param>
  5. /// <returns></returns>
  6. /// <exception cref="NotImplementedException"></exception>
  7. public async Task<ApiResult<int?>> GenerateGraph(Form.GraphGenerationRequest request)
  8. {
  9.     //添加后台任务
  10.     BackgroundJob.Enqueue(() => BackServiceCreateImg(request));
  11.     string message = await SendWaitingCount("img-queue");
  12.     return new ApiResult<int?>(HttpResultTypeEnum.Success, count - 1, message);
  13. }
  14. /// <summary>
  15. /// 推送队列的等待信息
  16. /// </summary>
  17. /// <param name="enqueue">任务类型</param>
  18. /// <returns></returns>
  19. private async Task<string> SendWaitingCount(string enqueue)
  20. {
  21.     var queueLength = JobStorage.Current.GetMonitoringApi()
  22.         .EnqueuedCount(enqueue);
  23.     string message = $"任务已提交,您前面还有 {queueLength} 个任务正在等待。";
  24.     await _hubContext.Clients.All.SendAsync("updateWaitingCount", queueLength);
  25.     return message;
  26. }
复制代码
4.案例成果

案例地址(AI聊天+图片生成):https://ai.terramours.site/

阅读如遇样式问题,请前往个人博客浏览: https://www.raokun.top

拥抱ChatGPT:https://ai.terramours.site

开源项目地址:https://github.com/firstsaofan/TerraMours


来源:https://www.cnblogs.com/raok/archive/2023/06/06/17462105.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具