翼度科技»论坛 编程开发 .net 查看内容

使用Blazor WASM实现可取消的多文件带校验并发分片上传

6

主题

6

帖子

18

积分

新手上路

Rank: 1

积分
18
前言

上传大文件时,原始HTTP文件上传功能可能会影响使用体验,此时使用分片上传功能可以有效避免原始上传的弊端。由于分片上传不是HTTP标准的一部分,所以只能自行开发相互配合的服务端和客户端。文件分片上传在许多情况时都拥有很多好处,除非已知需要上传的文件一定非常小。分片上传可以对上传的文件进行快速分片校验,避免大文件上传时长时间等待校验,当然完整校验可以在秒传时使用,有这种需求的情况就只能老实等待校验了。
Blazr WASM提供了在 .NET环境中使用浏览器功能的能力,充分利用C#和 .NET能够大幅简化分片上传功能的开发。本次示例使用HTTP标准上传作为分片上传的底层基础,并提供分片校验功能保障上传数据的完整性。
新书宣传

有关新书的更多介绍欢迎查看《C#与.NET6 开发从入门到实践》上市,作者亲自来打广告了!

正文

本示例的Blazor代码位于默认ASP.NET Core托管的Blazor WASM应用模板的Index页面。
在Shared项目添加公共数据模型
  1. /// <summary>
  2. /// 文件分片上传输入模型
  3. /// </summary>
  4. public class FileChunkUploadInput
  5. {
  6.     /// <summary>
  7.     /// 上传任务代码
  8.     /// </summary>
  9.     public string? UploadTaskCode { get; set; }
  10.     /// <summary>
  11.     /// 上传请求类型
  12.     /// </summary>
  13.     public string UploadType { get; set; } = null!;
  14.     /// <summary>
  15.     /// 文件名
  16.     /// </summary>
  17.     public string FileName { get; set; } = null!;
  18.     /// <summary>
  19.     /// 文件大小
  20.     /// </summary>
  21.     public long? FileSize { get; set; }
  22.     /// <summary>
  23.     /// 支持的Hash算法,优选算法请靠前
  24.     /// </summary>
  25.     public List<string>? AllowedHashAlgorithm { get; set; }
  26.     /// <summary>
  27.     /// 使用的Hash算法
  28.     /// </summary>
  29.     public string? HashAlgorithm { get; set; }
  30.     /// <summary>
  31.     /// Hash值
  32.     /// </summary>
  33.     public string? HashValue { get; set; }
  34.     /// <summary>
  35.     /// 文件分片数量
  36.     /// </summary>
  37.     public int FileChunkCount { get; set; }
  38.     /// <summary>
  39.     /// 文件片段大小
  40.     /// </summary>
  41.     public int? FileChunkSize { get; set; }
  42.     /// <summary>
  43.     /// 文件片段偏移量(相对于整个文件)
  44.     /// </summary>
  45.     public long? FileChunkOffset { get; set; }
  46.     /// <summary>
  47.     /// 文件片段索引
  48.     /// </summary>
  49.     public int? FileChunkIndex { get; set; }
  50.     /// <summary>
  51.     /// 取消上传的原因
  52.     /// </summary>
  53.     public string? CancelReason { get; set; }
  54. }
  55. /// <summary>
  56. /// 文件分片上传开始结果
  57. /// </summary>
  58. public class FileChunkUploadStartReault
  59. {
  60.     /// <summary>
  61.     /// 上传任务代码
  62.     /// </summary>
  63.     public string UploadTaskCode { get; set; } = null!;
  64.     /// <summary>
  65.     /// 选中的Hash算法
  66.     /// </summary>
  67.     public string SelectedHashAlgorithm { get; set; } = null!;
  68. }
  69. /// <summary>
  70. /// Hash助手
  71. /// </summary>
  72. public static class HashHelper
  73. {
  74.     /// <summary>
  75.     /// 把Hash的字节数组转换为16进制字符串表示
  76.     /// </summary>
  77.     /// <param name="bytes">原始Hash值</param>
  78.     /// <returns>Hash值的16进制文本表示(大写)</returns>
  79.     public static string ToHexString(this byte[] bytes)
  80.     {
  81.         StringBuilder sb = new(bytes.Length * 2);
  82.         foreach (var @byte in bytes)
  83.         {
  84.             sb.Append(@byte.ToString("X2"));
  85.         }
  86.         return sb.ToString();
  87.     }
  88. }
复制代码
服务端控制器
  1. [ApiController]
  2. [Route("[controller]")]
  3. public class UploadController : ControllerBase
  4. {
  5.     /// <summary>
  6.     /// 支持的Hash算法,优选算法请靠前
  7.     /// </summary>
  8.     private static string[] supportedHashAlgorithm = new[] { "MD5", "SHA1", "SHA256" };
  9.     /// <summary>
  10.     /// 文件写入锁的线程安全字典,每个上传任务对应一把锁
  11.     /// </summary>
  12.     private static readonly ConcurrentDictionary<string, AsyncLock> fileWriteLockerDict = new();
  13.     private readonly ILogger<UploadController> _logger;
  14.     private readonly IWebHostEnvironment _env;
  15.     public UploadController(ILogger<UploadController> logger, IWebHostEnvironment env)
  16.     {
  17.         _logger = logger;
  18.         _env = env;
  19.     }
  20.     /// <summary>
  21.     /// 分片上传动作
  22.     /// </summary>
  23.     /// <param name="input">上传表单</param>
  24.     /// <param name="fileChunkData">文件片段数据</param>
  25.     /// <param name="requestAborted">请求取消令牌</param>
  26.     /// <returns>片段上传结果</returns>
  27.     [HttpPost, RequestSizeLimit(1024 * 1024 * 11)]
  28.     [ProducesResponseType(StatusCodes.Status200OK)]
  29.     [ProducesResponseType(StatusCodes.Status400BadRequest)]
  30.     [ProducesDefaultResponseType]
  31.     public async Task<IActionResult> Upload(
  32.         [FromForm]FileChunkUploadInput input,
  33.         [FromForm]IFormFile? fileChunkData,
  34.         CancellationToken requestAborted)
  35.     {
  36.         switch (input.UploadType)
  37.         {
  38.             // 请求开始一个新的上传任务,协商上传参数
  39.             case "startUpload":
  40.                 {
  41.                     //var trustedFileNameForDisplay =
  42.                     //    WebUtility.HtmlEncode(fileChunkData?.FileName ?? input.FileName);
  43.                     // 选择双方都支持的优选Hash算法
  44.                     var selectedHashAlgorithm = supportedHashAlgorithm
  45.                         .Intersect(input.AllowedHashAlgorithm ?? Enumerable.Empty<string>())
  46.                         .FirstOrDefault();
  47.                     // 验证必要的表单数据
  48.                     if (selectedHashAlgorithm is null or "")
  49.                     {
  50.                         ModelState.AddModelError<FileChunkUploadInput>(x => x.AllowedHashAlgorithm, "can not select hash algorithm");
  51.                     }
  52.                     if (input.FileSize is null)
  53.                     {
  54.                         ModelState.AddModelError<FileChunkUploadInput>(x => x.FileSize, "must have value for start、upload and complete");
  55.                     }
  56.                     if (ModelState.ErrorCount > 0)
  57.                     {
  58.                         return ValidationProblem(ModelState);
  59.                     }
  60.                     // 使用随机文件名提高安全性,并把文件名作为任务代码使用
  61.                     var trustedFileNameForFileStorage = Path.GetRandomFileName();
  62.                     var savePath = Path.Combine(
  63.                         _env.ContentRootPath,
  64.                         _env.EnvironmentName,
  65.                         "unsafe_uploads",
  66.                         trustedFileNameForFileStorage);
  67.                     var savePathWithFile = Path.Combine(
  68.                         savePath,
  69.                         $"{input.FileName}.tmp");
  70.                     if (!Directory.Exists(savePath))
  71.                     {
  72.                         Directory.CreateDirectory(savePath);
  73.                     }
  74.                     // 根据表单创建对应大小的文件
  75.                     await using (var fs = new FileStream(savePathWithFile, FileMode.Create))
  76.                     {
  77.                         fs.SetLength(input.FileSize!.Value);
  78.                         await fs.FlushAsync();
  79.                     }
  80.                     // 设置锁
  81.                     fileWriteLockerDict.TryAdd(trustedFileNameForFileStorage, new());
  82.                     // 返回协商结果
  83.                     return Ok(new FileChunkUploadStartReault
  84.                     {
  85.                         UploadTaskCode = trustedFileNameForFileStorage,
  86.                         SelectedHashAlgorithm = selectedHashAlgorithm!
  87.                     });
  88.                 }
  89.             // 上传文件片段
  90.             case "uploadChunk":
  91.                 // 验证表单
  92.                 if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _))
  93.                 {
  94.                     ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists");
  95.                     return ValidationProblem(ModelState);
  96.                 }
  97.                 // 使用内存池缓冲数据,注意使用using释放内存
  98.                 using (var pooledMemory = MemoryPool<byte>.Shared.Rent((int)fileChunkData!.Length))
  99.                 {
  100.                     // 使用切片语法获取精准大小的内存缓冲区装载上传的数据
  101.                     var buffer = pooledMemory.Memory[..(int)fileChunkData!.Length];
  102.                     var readBytes = await fileChunkData.OpenReadStream().ReadAsync(buffer, requestAborted);
  103.                     var readBuffer = buffer[..readBytes];
  104.                     Debug.Assert(readBytes == fileChunkData!.Length);
  105.                     // 校验Hash
  106.                     var hash = input.HashAlgorithm switch
  107.                     {
  108.                         "SHA1" => SHA1.HashData(readBuffer.Span),
  109.                         "SHA256" => SHA256.HashData(readBuffer.Span),
  110.                         "MD5" => MD5.HashData(readBuffer.Span),
  111.                         _ => Array.Empty<byte>()
  112.                     };
  113.                     if (hash.ToHexString() != input.HashValue)
  114.                     {
  115.                         ModelState.AddModelError<FileChunkUploadInput>(x => x.HashValue, "hash does not match");
  116.                         return ValidationProblem(ModelState);
  117.                     }
  118.                     var savePath = Path.Combine(
  119.                         _env.ContentRootPath,
  120.                         _env.EnvironmentName,
  121.                         "unsafe_uploads",
  122.                         input.UploadTaskCode!);
  123.                     var savePathWithFile = Path.Combine(
  124.                         savePath,
  125.                         $"{input.FileName}.tmp");
  126.                     // 使用锁写入数据,文件流不支持写共享,必须串行化
  127.                     if(fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var locker))
  128.                     {
  129.                         using (await locker.LockAsync())
  130.                         {
  131.                             await using (var fs = new FileStream(savePathWithFile, FileMode.Open, FileAccess.Write))
  132.                             {
  133.                                 // 定位文件流
  134.                                 fs.Seek(input.FileChunkOffset!.Value, SeekOrigin.Begin);
  135.                                 await fs.WriteAsync(readBuffer, requestAborted);
  136.                                 await fs.FlushAsync();
  137.                             }
  138.                         }
  139.                     }
  140.                 }
  141.                 return Ok();
  142.             // 取消上传
  143.             case "cancelUpload":
  144.                 // 验证表单
  145.                 if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _))
  146.                 {
  147.                     ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists");
  148.                     return ValidationProblem(ModelState);
  149.                 }
  150.                 {
  151.                     var deletePath = Path.Combine(
  152.                         _env.ContentRootPath,
  153.                         _env.EnvironmentName,
  154.                         "unsafe_uploads",
  155.                         input.UploadTaskCode!);
  156.                     // 删除文件,清除锁
  157.                     if (fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var locker))
  158.                     {
  159.                         using (await locker.LockAsync())
  160.                         {
  161.                             if (Directory.Exists(deletePath))
  162.                             {
  163.                                 var dir = new DirectoryInfo(deletePath);
  164.                                 dir.Delete(true);
  165.                             }
  166.                             fileWriteLockerDict.TryRemove(input.UploadTaskCode!, out _);
  167.                         }
  168.                     }
  169.                 }
  170.                 return Ok();
  171.             // 完成上传
  172.             case "completeUpload":
  173.                 // 验证表单
  174.                 if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _))
  175.                 {
  176.                     ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists");
  177.                     return ValidationProblem(ModelState);
  178.                 }
  179.                 {
  180.                     var savePath = Path.Combine(
  181.                         _env.ContentRootPath,
  182.                         _env.EnvironmentName,
  183.                         "unsafe_uploads",
  184.                         input.UploadTaskCode!);
  185.                     // 去除文件的临时扩展名,清除锁
  186.                     var savePathWithFile = Path.Combine(savePath, $"{input.FileName}.tmp");
  187.                     var fi = new FileInfo(savePathWithFile);
  188.                     fi.MoveTo(Path.Combine(savePath, input.FileName));
  189.                     fileWriteLockerDict.TryRemove(input.UploadTaskCode!, out _);
  190.                 }
  191.                 return Ok();
  192.             default:
  193.                 return BadRequest();
  194.         }
  195.     }
  196. }
复制代码
服务端使用三段式上传模式,开始上传,上传数据,完成(取消)上传。开始上传负责协商Hash算法和分配任务代码;上传数据负责具体的传输,并通过表单提供附加信息方便服务端操作。完成上传负责善后和资源清理。其中文件写入的异步锁使用Nito.AsyncEx代替不支持在异步中使用的lock语句。
页面代码(Index.razor),在结尾追加
  1. <p>支持随时取消的多文件并行分片上传,示例同时上传2个文件,每个文件同时上传2个分片,合计同时上传4个分片</p>
  2. <InputFile OnChange="UploadFile" multiple></InputFile>
  3. <button @onclick="async (MouseEventArgs e) => uploadCancelSource?.Cancel()">取消上传</button>
  4. @code{
  5.     [Inject] private HttpClient _http { get; init; } = null!;
  6.     [Inject] private ILogger<Index> _logger { get; init; } = null!;
  7.     private CancellationTokenSource? uploadCancelSource;
  8.     /// <summary>
  9.     /// 上传文件
  10.     /// </summary>
  11.     /// <param name="args">上传文件的事件参数</param>
  12.     /// <returns></returns>
  13.     private async Task UploadFile(InputFileChangeEventArgs args)
  14.     {
  15.         // 设置文件并发选项
  16.         var parallelCts = new CancellationTokenSource();
  17.         uploadCancelSource = parallelCts;
  18.         var parallelOption = new ParallelOptions
  19.         {
  20.             MaxDegreeOfParallelism = 2,
  21.             CancellationToken = parallelCts.Token
  22.         };
  23.         // 并发上传所有文件
  24.         await Parallel.ForEachAsync(
  25.             args.GetMultipleFiles(int.MaxValue),
  26.             parallelOption,
  27.             async (file, cancellation) =>
  28.             {
  29.                 // 这里的取消令牌是并发方法创建的,和并发选项里的令牌不是一个
  30.                 if (cancellation.IsCancellationRequested)
  31.                 {
  32.                     parallelCts.Cancel();
  33.                     return;
  34.                 }
  35.                 // 使用链接令牌确保外部取消能传递到内部
  36.                 var chunkUploadResult = await UploadChunkedFile(
  37.                     file,
  38.                     CancellationTokenSource.CreateLinkedTokenSource(
  39.                         parallelCts.Token,
  40.                         cancellation
  41.                     ).Token
  42.                 );
  43.                 // 如果上传不成功则取消后续上传
  44.                 if (chunkUploadResult != FileUploadResult.Success)
  45.                 {
  46.                     parallelCts.Cancel();
  47.                     return;
  48.                 }
  49.             }
  50.         );
  51.     }
  52.     /// <summary>
  53.     /// 分片上传文件
  54.     /// </summary>
  55.     /// <param name="file">要上传的文件</param>
  56.     /// <param name="cancellation">取消令牌</param>
  57.     /// <returns>上传结果</returns>
  58.     private async Task<FileUploadResult> UploadChunkedFile(IBrowserFile file, CancellationToken cancellation = default)
  59.     {
  60.         if (cancellation.IsCancellationRequested) return FileUploadResult.Canceled;
  61.         _logger.LogInformation("开始上传文件:{0}", file.Name);
  62.         // 计算分片大小,文件小于10MB分片1MB,大于100MB分片10MB,在其间则使用不超过10片时的所需大小
  63.         var coefficient = file.Size switch
  64.         {
  65.             <= 1024 * 1024 * 10 => 1,
  66.             > 1024 * 1024 * 10 and <= 1024 * 1024 *100 => (int)Math.Ceiling(file.Size / (1024.0 * 1024) / 10),
  67.             _ => 10
  68.         };
  69.         // 初始化分片参数,准备字符串格式的数据供表单使用
  70.         var bufferSize = 1024 * 1024 * coefficient; // MB
  71.         var stringBufferSize = bufferSize.ToString();
  72.         var chunkCount = (int)Math.Ceiling(file.Size / (double)bufferSize);
  73.         var stringChunkCount = chunkCount.ToString();
  74.         var stringFileSize = file.Size.ToString();
  75.         // 发起分片上传,协商Hash算法,获取任务代码
  76.         var uploadStartContent = new List<KeyValuePair<string, string>>
  77.         {
  78.             new("uploadType", "startUpload"),
  79.             new("fileName", file.Name),
  80.             new("fileSize", stringFileSize),
  81.             new("allowedHashAlgorithm", "SHA1"),
  82.             new("allowedHashAlgorithm", "SHA256"),
  83.             new("fileChunkCount", stringChunkCount),
  84.             new("fileChunkSize", stringBufferSize),
  85.         };
  86.         var uploadStartForm = new FormUrlEncodedContent(uploadStartContent);
  87.         HttpResponseMessage? uploadStartResponse = null;
  88.         try
  89.         {
  90.             uploadStartResponse = await _http.PostAsync("/upload", uploadStartForm, cancellation);
  91.         }
  92.         catch(TaskCanceledException e)
  93.         {
  94.             _logger.LogWarning(e, "外部取消上传,已停止文件:{0} 的上传", file.Name);
  95.             return FileUploadResult.Canceled;
  96.         }
  97.         catch(Exception e)
  98.         {
  99.             _logger.LogError(e, "文件:{0} 的上传参数协商失败", file.Name);
  100.             return FileUploadResult.Fail;
  101.         }
  102.         // 如果服务器响应失败,结束上传
  103.         if (uploadStartResponse?.IsSuccessStatusCode is null or false)
  104.         {
  105.             _logger.LogError("文件:{0} 的上传参数协商失败", file.Name);
  106.             return FileUploadResult.Fail;
  107.         }
  108.         // 解析协商的参数
  109.         var uploadStartReault = await uploadStartResponse.Content.ReadFromJsonAsync<FileChunkUploadStartReault>();
  110.         var uploadTaskCode = uploadStartReault!.UploadTaskCode;
  111.         var selectedHashAlgorithm = uploadStartReault!.SelectedHashAlgorithm;
  112.         _logger.LogInformation("文件:{0} 的上传参数协商成功", file.Name);
  113.         // 设置分片并发选项
  114.         var parallelOption = new ParallelOptions
  115.         {
  116.             MaxDegreeOfParallelism = 2,
  117.         };
  118.         var fileUploadCancelSource = new CancellationTokenSource();
  119.         var sliceEnumeratorCancelSource = CancellationTokenSource
  120.             .CreateLinkedTokenSource(
  121.                 cancellation,
  122.                 fileUploadCancelSource.Token
  123.             );
  124.         // 各个分片的上传结果
  125.         var sliceUploadResults = new FileUploadResult?[chunkCount];
  126.         // 并发上传各个分片,并发循环本身不能用并发选项的取消令牌取消,可能会导致内存泄漏,应该通过切片循环的取消使并发循环因没有可用元素自然结束
  127.         await Parallel.ForEachAsync(
  128.             SliceFileAsync(
  129.                 file,
  130.                 bufferSize,
  131.                 sliceEnumeratorCancelSource.Token
  132.             ),
  133.             parallelOption,
  134.             async (fileSlice, sliceUploadCancel) =>
  135.             {
  136.                 // 解构参数
  137.                 var (memory, sliceIndex, readBytes, fileOffset) = fileSlice;
  138.                 // 使用using确保结束后把租用的内存归还给内存池
  139.                 using (memory)
  140.                 {
  141.                     var stringSliceIndex = sliceIndex.ToString();
  142.                     // 主动取消上传,发送取消请求,通知服务端清理资源
  143.                     if (sliceUploadCancel.IsCancellationRequested)
  144.                     {
  145.                         _logger.LogWarning("外部取消上传,已停止文件:{0} 的上传", file.Name);
  146.                         fileUploadCancelSource.Cancel();
  147.                         sliceUploadResults[sliceIndex] = FileUploadResult.Canceled;
  148.                         var uploadCancelContent = new Dictionary<string, string>()
  149.                         {
  150.                             {"uploadType", "cancelUpload"},
  151.                             {"uploadTaskCode", uploadTaskCode!},
  152.                             {"fileName", file.Name},
  153.                             {"hashAlgorithm", selectedHashAlgorithm},
  154.                             {"fileChunkCount", stringChunkCount},
  155.                             {"fileChunkIndex", stringSliceIndex},
  156.                             {"cancelReason", "调用方要求取消上传。"},
  157.                         };
  158.                         var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
  159.                         var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);
  160.                         return;
  161.                     }
  162.                     // 当前上传分片索引应当小于预计的分片数
  163.                     Debug.Assert(sliceIndex < chunkCount);
  164.                     // 获取准确大小的缓冲区,从内存池租用时得到的容量可能大于申请的大小,使用C#的新集合切片语法
  165.                     var readBuffer = memory.Memory[..readBytes];
  166.                     var sw = Stopwatch.StartNew();
  167.                     // 根据协商的算法计算Hash,wasm环境不支持MD5和全部非对称加密算法
  168.                     var hash = selectedHashAlgorithm switch
  169.                     {
  170.                         "SHA1" => SHA1.HashData(readBuffer.Span),
  171.                         "SHA256" => SHA256.HashData(readBuffer.Span),
  172.                         _ => Array.Empty<byte>()
  173.                     };
  174.                     sw.Stop();
  175.                     _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 计算Hash用时 {3}", file.Name, sliceIndex, readBytes, sw.Elapsed);
  176.                     var stringReadBytes = readBytes.ToString();
  177.                     var stringFileOffset = fileOffset.ToString();
  178.                     // 上传当前分片
  179.                     MultipartFormDataContent uploadFileForm = new();
  180.                     uploadFileForm.Add(new StringContent(uploadTaskCode!), "uploadTaskCode");
  181.                     uploadFileForm.Add(new StringContent("uploadChunk"), "uploadType");
  182.                     uploadFileForm.Add(new StringContent(file.Name), "fileName");
  183.                     uploadFileForm.Add(new StringContent(stringFileSize), "fileSize");
  184.                     uploadFileForm.Add(new StringContent(selectedHashAlgorithm!), "hashAlgorithm");
  185.                     uploadFileForm.Add(new StringContent(hash.ToHexString()), "hashValue");
  186.                     uploadFileForm.Add(new StringContent(stringChunkCount), "fileChunkCount");
  187.                     uploadFileForm.Add(new StringContent(stringReadBytes), "fileChunkSize");
  188.                     uploadFileForm.Add(new StringContent(stringFileOffset), "fileChunkOffset");
  189.                     uploadFileForm.Add(new StringContent(stringSliceIndex), "fileChunkIndex");
  190.                     // 如果是未知的文件类型,设置为普通二进制流的MIME类型
  191.                     var fileChunk = new ReadOnlyMemoryContent(readBuffer);
  192.                     fileChunk.Headers.ContentType = new MediaTypeHeaderValue(string.IsNullOrEmpty(file.ContentType) ? "application/octet-stream" : file.ContentType);
  193.                     uploadFileForm.Add(fileChunk, "fileChunkData", file.Name);
  194.                     HttpResponseMessage? uploadResponse = null;
  195.                     try
  196.                     {
  197.                         var uploadTaskCancel = CancellationTokenSource
  198.                             .CreateLinkedTokenSource(
  199.                                 sliceUploadCancel,
  200.                                 sliceEnumeratorCancelSource.Token
  201.                             );
  202.                         _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 开始上传", file.Name, sliceIndex, readBytes);
  203.                         sw.Restart();
  204.                         uploadResponse = await _http.PostAsync("/upload", uploadFileForm, uploadTaskCancel.Token);
  205.                     }
  206.                     catch (TaskCanceledException e)
  207.                     {
  208.                         _logger.LogWarning(e, "外部取消上传,已停止文件:{0} 的上传", file.Name);
  209.                         fileUploadCancelSource.Cancel();
  210.                         sliceUploadResults[sliceIndex] = FileUploadResult.Canceled;
  211.                         var uploadCancelContent = new Dictionary<string, string>()
  212.                         {
  213.                             {"uploadType", "cancelUpload"},
  214.                             {"uploadTaskCode", uploadTaskCode!},
  215.                             {"fileName", file.Name},
  216.                             {"hashAlgorithm", selectedHashAlgorithm},
  217.                             {"fileChunkCount", stringChunkCount},
  218.                             {"fileChunkIndex", stringSliceIndex},
  219.                             {"cancelReason", "调用方要求取消上传。"},
  220.                         };
  221.                         var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
  222.                         var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);
  223.                         return;
  224.                     }
  225.                     catch (Exception e)
  226.                     {
  227.                         _logger.LogError(e, "上传发生错误,已停止文件:{0} 的上传", file.Name);
  228.                         fileUploadCancelSource.Cancel();
  229.                         sliceUploadResults[sliceIndex] = FileUploadResult.Fail;
  230.                         var uploadCancelContent = new Dictionary<string, string>()
  231.                         {
  232.                             {"uploadType", "cancelUpload"},
  233.                             {"uploadTaskCode", uploadTaskCode!},
  234.                             {"fileName", file.Name},
  235.                             {"hashAlgorithm", selectedHashAlgorithm},
  236.                             {"fileChunkCount", stringChunkCount},
  237.                             {"fileChunkSize", stringReadBytes},
  238.                             {"fileChunkOffset", stringFileOffset},
  239.                             {"fileChunkIndex", stringSliceIndex},
  240.                             {"cancelReason", "上传过程中发生错误。"},
  241.                         };
  242.                         var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
  243.                         var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);
  244.                         return;
  245.                     }
  246.                     finally
  247.                     {
  248.                         sw.Stop();
  249.                     }
  250.                     // 上传发生错误,发送取消请求,通知服务端清理资源
  251.                     if (uploadResponse?.IsSuccessStatusCode is null or false)
  252.                     {
  253.                         _logger.LogError("上传发生错误,已停止文件:{0} 的上传", file.Name);
  254.                         fileUploadCancelSource.Cancel();
  255.                         sliceUploadResults[sliceIndex] = FileUploadResult.Fail;
  256.                         var uploadCancelContent = new Dictionary<string, string>()
  257.                         {
  258.                             {"uploadType", "cancelUpload"},
  259.                             {"uploadTaskCode", uploadTaskCode!},
  260.                             {"fileName", file.Name},
  261.                             {"hashAlgorithm", selectedHashAlgorithm},
  262.                             {"fileChunkCount", stringChunkCount},
  263.                             {"fileChunkSize", stringReadBytes},
  264.                             {"fileChunkOffset", stringFileOffset},
  265.                             {"fileChunkIndex", stringSliceIndex},
  266.                             {"cancelReason", "上传过程中发生错误。"},
  267.                         };
  268.                         var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
  269.                         var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);
  270.                         return;
  271.                     }
  272.                     _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 上传成功,用时 {3}", file.Name, sliceIndex, readBytes, sw.Elapsed);
  273.                     sliceUploadResults[sliceIndex] = FileUploadResult.Success;
  274.                 }
  275.             }
  276.         );
  277.         // 如果所有分片都上传成功,则发送完成请求完成上传
  278.         if (sliceUploadResults.All(success => success is FileUploadResult.Success))
  279.         {
  280.             var uploadCompleteContent = new Dictionary<string, string>()
  281.             {
  282.                 {"uploadType", "completeUpload"},
  283.                 {"uploadTaskCode", uploadTaskCode!},
  284.                 {"fileName", file.Name},
  285.                 {"fileSize", stringFileSize},
  286.                 {"hashAlgorithm", selectedHashAlgorithm},
  287.                 {"fileChunkCount", stringChunkCount},
  288.                 {"fileChunkSize", stringBufferSize},
  289.             };
  290.             var uploadCompleteForm = new FormUrlEncodedContent(uploadCompleteContent);
  291.             var uploadCompleteResponse = await _http.PostAsync("/upload", uploadCompleteForm);
  292.             if (uploadCompleteResponse.IsSuccessStatusCode)
  293.             {
  294.                 _logger.LogInformation("文件:{0} 上传成功,共 {1} 个片段", file.Name, chunkCount);
  295.                 return FileUploadResult.Success;
  296.             }
  297.             else
  298.             {
  299.                 _logger.LogError("上传发生错误,已停止文件:{0} 的上传", file.Name);
  300.                 var uploadCancelContent = new Dictionary<string, string>()
  301.                 {
  302.                     {"uploadType", "cancelUpload"},
  303.                     {"uploadTaskCode", uploadTaskCode!},
  304.                     {"fileName", file.Name},
  305.                     {"hashAlgorithm", selectedHashAlgorithm},
  306.                     {"fileChunkCount", stringChunkCount},
  307.                     {"cancelReason", "上传过程中发生错误。"},
  308.                 };
  309.                 var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent);
  310.                 var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm);
  311.                 return FileUploadResult.Fail;
  312.             }
  313.         }
  314.         else if (sliceUploadResults.Any(success => success is FileUploadResult.Fail))
  315.         {
  316.             return FileUploadResult.Fail;
  317.         }
  318.         else
  319.         {
  320.             return FileUploadResult.Canceled;
  321.         }
  322.     }
  323.     /// <summary>
  324.     /// 异步切分要上传的文件
  325.     /// <br/>如果想中途结束切分,不要在调用此方法的foreach块中使用break,请使用取消令牌,否则会出现内存泄漏
  326.     /// </summary>
  327.     /// <param name="file">要分片的文件</param>
  328.     /// <param name="sliceSize">分片大小</param>
  329.     /// <param name="cancellation">取消令牌</param>
  330.     /// <returns>已切分的文件片段数据,用完切记释放其中的内存缓冲</returns>
  331.     private static async IAsyncEnumerable<(IMemoryOwner<byte> memory, int sliceIndex, int readBytes, long fileOffset)> SliceFileAsync(
  332.         IBrowserFile file,
  333.         int sliceSize,
  334.         [EnumeratorCancellation] CancellationToken cancellation = default)
  335.     {
  336.         if (cancellation.IsCancellationRequested) yield break;
  337.         int fileSliceIndex;
  338.         long fileOffset;
  339.         IMemoryOwner<byte> memory;
  340.         await using var fileStream = file.OpenReadStream(long.MaxValue);
  341.         for (fileSliceIndex = 0, fileOffset = 0, memory = MemoryPool<byte>.Shared.Rent(sliceSize);
  342.             (await fileStream.ReadAsync(memory.Memory[..sliceSize], cancellation)) is int readBytes and > 0;
  343.             fileSliceIndex++, fileOffset += readBytes, memory = MemoryPool<byte>.Shared.Rent(sliceSize)
  344.         )
  345.         {
  346.             if(cancellation.IsCancellationRequested)
  347.             {
  348.                 // 如果取消切分,缓冲不会返回到外部,只能在内部释放
  349.                 memory.Dispose();
  350.                 yield break;
  351.             }
  352.             yield return (memory, fileSliceIndex, readBytes, fileOffset);
  353.         }
  354.         // 切分结束后会多出一个没用的缓冲,只能在内部释放
  355.         memory.Dispose();
  356.     }
  357.     /// <summary>
  358.     /// 上传结果
  359.     /// </summary>
  360.     public enum FileUploadResult
  361.     {
  362.         /// <summary>
  363.         /// 失败
  364.         /// </summary>
  365.         Fail = -2,
  366.         /// <summary>
  367.         /// 取消
  368.         /// </summary>
  369.         Canceled = -1,
  370.         /// <summary>
  371.         /// 没有结果,未知结果
  372.         /// </summary>
  373.         None = 0,
  374.         /// <summary>
  375.         /// 成功
  376.         /// </summary>
  377.         Success = 1
  378.     }
  379. }
复制代码
示例使用Parallel.ForEachAsync方法并行启动多个文件和每个文件的多个片段的上传,并发量由方法的参数控制。UploadChunkedFile方法负责单个文件的上传,其中的IBrowserFile类型是.NET 6新增的文件选择框选中项的包装,可以使用其中的OpenReadStream方法流式读取文件数据,确保大文件上传不会在内存中缓冲所有数据导致内存占用问题。
UploadChunkedFile方法内部使用自适应分片大小算法,规则为片段最小1MB,最大10MB,尽可能平均分为10份。得出片段大小后向服务端请求开始上传文件,服务端成功返回后开始文件切分、校验和上传。
SliceFileAsync负责切分文件并流式返回每个片段,切分方法是惰性的,所以不用担心占用大量内存,但是这个方法只能使用取消令牌中断切分,如果在调用该方法的await foreach块中使用break中断会产生内存泄漏。切分完成后会返回包含片段数据的内存缓冲和其他附加信息。OpenReadStream需要使用参数控制允许读取的最大字节数(默认512KB),因为这里是分片上传,直接设置为long.MaxValue即可。for循环头使用逗号表达式定义多个循环操作,使循环体的代码清晰简洁。
UploadChunkedFile方法使用Parallel.ForEachAsync并行启动多个片段的校验和上传,WASM中不支持MD5和所有非对称加密算法,需要注意。完成文件的并行上传或发生错误后会检查所有片段的上传情况,如果所有片段都上传成功,就发送完成上传请求通知服务端收尾善后,否则删除临时文件。
结语

这应该是一个比较清晰易懂的分片上传示例。示例使用Blazor 和C#以非常流畅的异步代码实现了并发分片上传。但是本示例依然有许多可优化的点,例如实现断点续传,服务端如果没有收到结束请求时的兜底处理等,这些就留给朋友们思考了。
又是很久没有写文章了,一直没有找到什么好选题,难得找到一个,经过将近1周的研究开发终于搞定了。
QQ群

读者交流QQ群:540719365

欢迎读者和广大朋友一起交流,如发现本书错误也欢迎通过博客园、QQ群等方式告知我。
本文地址:https://www.cnblogs.com/coredx/p/17746162.html

来源:https://www.cnblogs.com/coredx/archive/2023/10/07/17746162.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具