雷達 发表于 2024-11-30 04:53:12

[C#] 对图像进行垂直翻转(FlipY)的跨平台SIMD硬件加速向量算法,兼谈并行处

目录

[*]一、标量算法

[*]1.1 算法实现
[*]1.2 基准测试代码

[*]二、向量算法

[*]2.1 算法思路

[*]2.1.1 解决非整数倍带来的难点

[*]2.2 算法实现
[*]2.3 基准测试代码

[*]三、使用系统所提供的MemoryCopy方法进行复制
[*]四、对算法进行检查
[*]五、基准测试结果

[*]5.1 X86 架构
[*]5.2 Arm 架构
[*]5.3 .NET Framework

[*]六、并行处理收益极少的原因分析

[*]6.1 观察测试结果,产生疑问
[*]6.2 数据分析
[*]6.3 找到原因
[*]6.4 进一步优化的空间

[*]附录

将位图进行水平翻转或垂直翻转,这些是图像处理的常用算法。本文介绍最简单的垂直翻转,便于后续文章来探讨复杂的水平翻转算法。
本文除了会给出标量算法外,还会给出向量算法。且这些算法是跨平台的,同一份源代码,能在 X86及Arm架构上运行,且均享有SIMD硬件加速。
一、标量算法

1.1 算法实现

垂直翻转的算法原理是很简单的,就是将位图的每一行重新排列——

[*]第0行放到第(height-1)行;
[*]第1行放到第(height-1-1)行;
[*]第2行放到第(height-1-2)行;
用i表示当前行的话,那么垂直翻转的规律是——

[*]第i行放到第(height-1-i)行。
由于此时是整行数据的复制,所以无需关心像素格式。算好每行的字节数后,便可以用字节复制的办法来复制一行数据了。而水平翻转需区分不同的像素格式,比垂直翻转要复杂的多,后面的文章会详细讲解水平翻转。
下面就是标量算法的源代码。
public static unsafe void ScalarDoBatch(byte* pSrc, int strideSrc, int width, int height, byte* pDst, int strideDst) {
    int strideCommon = Math.Min(Math.Abs(strideSrc), Math.Abs(strideDst));
    byte* pRow = pSrc;
    byte* qRow = pDst + (height - 1) * (long)strideDst; // Set to last row.
    for (int i = 0; i < height; i++) {
      byte* p = pRow;
      byte* q = qRow;
      for (int j = 0; j < strideCommon; j++) {
            *q++ = *p++;
      }
      pRow += strideSrc;
      qRow -= strideDst;
    }
}为了支持任意的像素格式,可以根据stride(跨距)来计算每一行的字节数。注意stride有可能是负数,故需要用Abs函数来计算绝对值。计算出strideCommon后,内循环便根据它,对当前行的数据进行逐个字节复制处理,直至复制完一整行。
外循环采用了“顺序读取、逆序写入”的策略。具体来说——

[*]读取是从第0行开始的,每次循环后移动到下一行。于是在上面的源代码中,pRow的初值就是 pSrc,每次循环后pRow会 加上 跨距strideSrc。
[*]写入是从最后一行开始的,每次循环后移动到前一行。于是在上面的源代码中,qRow的初值是 pDst + (height - 1) * (long)strideDst(目标位图最后一行的地址),每次循环后qRow会 减去 跨距strideDst。
1.2 基准测试代码

使用 BenchmarkDotNet 进行基准测试。
可以使用上一篇文章的公共函数,写好标量算法的基准测试代码。源代码如下。

public void Scalar() {
    ScalarDo(_sourceBitmapData, _destinationBitmapData, false);
}


public void ScalarParallel() {
    ScalarDo(_sourceBitmapData, _destinationBitmapData, true);
}

public static unsafe void ScalarDo(BitmapData src, BitmapData dst, bool useParallel = false) {
    int width = src.Width;
    int height = src.Height;
    int strideSrc = src.Stride;
    int strideDst = dst.Stride;
    byte* pSrc = (byte*)src.Scan0.ToPointer();
    byte* pDst = (byte*)dst.Scan0.ToPointer();
    bool allowParallel = useParallel && (height > 16) && (Environment.ProcessorCount > 1);
    if (allowParallel) {
      Parallel.For(0, height, i => {
            int start = i;
            int len = 1;
            byte* pSrc2 = pSrc + start * (long)strideSrc;
            byte* pDst2 = pDst + (height - 1 - start) * (long)strideDst;
            ScalarDoBatch(pSrc2, strideSrc, width, len, pDst2, strideDst);
      });
    } else {
      ScalarDoBatch(pSrc, strideSrc, width, height, pDst, strideDst);
    }
}由于现在是图像垂直翻转,于是并行算法需按照同样的规则来计算数据的地址。即上面提到的“顺序读取、逆序写入”。
具体来说——

[*]pSrc2 是源数据的指针。由于是顺序读取,故它的计算办法是 pSrc + start * (long)strideSrc.
[*]pDst2 是目标数据的指针。由于是逆序写入,故它的计算办法是 pDst + (height - 1 - start) * (long)strideDst. 编写代码时有时可能会漏掉了其中的"- 1",这是一种常见错误,容易导致指针访问非法地址等问题。请大家一定要细心。
二、向量算法

2.1 算法思路

上面的标量算法是每次复制1个字节,而向量算法可以每次复制1个向量。
若一行数据的字节数,恰好是向量大小的整数倍的话,那可以直接写个简单循环来处理。非常简单。
但在实际使用中,字节数大多数时候并不是向量大小的整数倍,此时处理起来会复杂一些。
2.1.1 解决非整数倍带来的难点

最保守的办法是——先用向量算法将整数倍的数据给处理完,随后对于末尾的数据,退回为标量算法来处理。
该办法确实有效,但存在以下缺点:

[*]需要编写2种算法——向量的和标量的。会带来更多开发成本与维护成本。
[*]末尾的数据是标量处理的,向量硬件加速不起作用。
有没有办法只需写1种算法,且对末尾的数据也能向量化处理呢?
对于“一边读取、一边写入”这种情况,由于读、写的一般是不同的数据区域,故有一种办法来实现上面的期望。该办法的关键思路是——先计算好“末尾指针”(向量处理时最后一笔数据的指针地址),然后在循环内每次均检查指针地址,当发现当前指针已经越过“末尾指针”时便强制将当前指针设置为“末尾指针”,以完成剩余数据的处理。
使用这种办法,能使末尾的数据也能向量化处理。代价就是——部分末尾的数据会被重复处理。但由于现在是“一边读取、一边写入”,故重复处理并不会带来负面影响。
在向量运算的循环中,增加指针地址判断的分支代码,的确会影响性能。但是由于现在CPU都拥有强大的分支预测能力,对于“末尾指针”判断这种在最后一次循环时才生效的分支,分支预测绝大多数是成功的,从而掩盖了该分支判断的花费。
其实本系列的前面几篇文章,也是使用这一办法,使末尾的数据也能向量化处理。所以代码中有“The last block is also use vector”的注释。
(另注:在C语言中,C99标准新增了restrict关键字来标记“指针指向不同的数据区域”情况。restrict用于限定和约束指针,表示这个指针只访问这块内存的唯一方式,也就是告诉编译器,这块内存中的内容的操作都只会通过这个指针,而不会通过其他变量或者指针。C# 目前还没有这样的关键字)
2.2 算法实现

根据上面的思路,编写代码。源代码如下。
public static unsafe void UseVectorsDoBatch(byte* pSrc, int strideSrc, int width, int height, byte* pDst, int strideDst) {
    int strideCommon = Math.Min(Math.Abs(strideSrc), Math.Abs(strideDst));
    int vectorWidth = Vector<byte>.Count;
    int maxX = strideCommon - vectorWidth;
    byte* pRow = pSrc;
    byte* qRow = pDst + (height - 1) * (long)strideDst; // Set to last row.
    for (int i = 0; i < height; i++) {
      Vector<byte>* pLast = (Vector<byte>*)(pRow + maxX);
      Vector<byte>* qLast = (Vector<byte>*)(qRow + maxX);
      Vector<byte>* p = (Vector<byte>*)pRow;
      Vector<byte>* q = (Vector<byte>*)qRow;
      for (; ; ) {
            Vector<byte> data;
            // Load.
            data = *p;
            // Store.
            *q = data;
            // Next.
            if (p >= pLast) break;
            ++p;
            ++q;
            if (p > pLast) p = pLast; // The last block is also use vector.
            if (q > qLast) q = qLast;
      }
      pRow += strideSrc;
      qRow -= strideDst;
    }
}由于只需要利用向量类型来做内存复制,于是可以不用调用 Vector静态类中的方法。甚至连Load、Store 方法也无需使用,因为对向量类型类型进行指针读写操作时,编译器会自动生成“未对齐加载”、“未对齐存储”的指令。
2.3 基准测试代码

随后为该算法编写基准测试代码。

public void UseVectors() {
    UseVectorsDo(_sourceBitmapData, _destinationBitmapData, false);
}


public void UseVectorsParallel() {
    UseVectorsDo(_sourceBitmapData, _destinationBitmapData, true);
}

public static unsafe void UseVectorsDo(BitmapData src, BitmapData dst, bool useParallel = false) {
    int vectorWidth = Vector<byte>.Count;
    int width = src.Width;
    int height = src.Height;
    if (width <= vectorWidth) {
      ScalarDo(src, dst, useParallel);
      return;
    }
    int strideSrc = src.Stride;
    int strideDst = dst.Stride;
    byte* pSrc = (byte*)src.Scan0.ToPointer();
    byte* pDst = (byte*)dst.Scan0.ToPointer();
    bool allowParallel = useParallel && (height > 16) && (Environment.ProcessorCount > 1);
    if (allowParallel) {
      Parallel.For(0, height, i => {
            int start = i;
            int len = 1;
            byte* pSrc2 = pSrc + start * (long)strideSrc;
            byte* pDst2 = pDst + (height - 1 - start) * (long)strideDst;
            UseVectorsDoBatch(pSrc2, strideSrc, width, len, pDst2, strideDst);
      });
    } else {
      UseVectorsDoBatch(pSrc, strideSrc, width, height, pDst, strideDst);
    }
}也是需要细心处理并行时的地址计算。
三、使用系统所提供的MemoryCopy方法进行复制

内循环只是做复制一整行数据的操作,恰好 .NET 系统提供了内存复制的方法“Buffer.MemoryCopy”。于是,利用它也可以完成任务。源代码如下。
public static unsafe void UseCopyDoBatch(byte* pSrc, int strideSrc, int width, int height, byte* pDst, int strideDst) {
    int strideCommon = Math.Min(Math.Abs(strideSrc), Math.Abs(strideDst));
    byte* pRow = pSrc;
    byte* qRow = pDst + (height - 1) * (long)strideDst; // Set to last row.
    for (int i = 0; i < height; i++) {
      Buffer.MemoryCopy(pRow, qRow, strideCommon, strideCommon);
      pRow += strideSrc;
      qRow -= strideDst;
    }
}代码变得更简洁了。
四、对算法进行检查

可按照上一篇的办法,使用SumDifference函数来对各种算法进行检查。
且由于现在是做图像的垂直翻转运算,翻转2次后便能变为原样。于是,现在还可以对 Scalar 算法进行检查。源代码如下。
bool allowCheck = true;
if (allowCheck) {
    try {
      TextWriter writer = Console.Out;
      long totalDifference, countByteDifference;
      int maxDifference;
      double averageDifference;
      long totalByte = Width * Height * 3;
      double percentDifference;
      // Baseline
      ScalarDo(_sourceBitmapData, _expectedBitmapData);
      // Scalar
      ScalarDo(_expectedBitmapData, _destinationBitmapData);
      totalDifference = SumDifference(_sourceBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);
      averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;
      percentDifference = 100.0 * countByteDifference / totalByte;
      writer.WriteLine(string.Format("Difference of Scalar: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));
      // ScalarParallel
      ScalarParallel();
      totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);
      averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;
      percentDifference = 100.0 * countByteDifference / totalByte;
      writer.WriteLine(string.Format("Difference of ScalarParallel: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));
      // UseVectors
      UseVectors();
      totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);
      averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;
      percentDifference = 100.0 * countByteDifference / totalByte;
      writer.WriteLine(string.Format("Difference of UseVectors: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));
      // UseVectorsParallel
      UseVectorsParallel();
      totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);
      averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;
      percentDifference = 100.0 * countByteDifference / totalByte;
      writer.WriteLine(string.Format("Difference of UseVectorsParallel: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));
      // UseCopy
      UseCopy();
      totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);
      averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;
      percentDifference = 100.0 * countByteDifference / totalByte;
      writer.WriteLine(string.Format("Difference of UseCopy: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));
      // UseCopyParallel
      UseCopyParallel();
      totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);
      averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;
      percentDifference = 100.0 * countByteDifference / totalByte;
      writer.WriteLine(string.Format("Difference of UseCopyParallel: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));
    } catch (Exception ex) {
      Debug.WriteLine(ex.ToString());
    }
}五、基准测试结果

5.1 X86 架构

X86架构下的基准测试结果如下。
BenchmarkDotNet v0.14.0, Windows 11 (10.0.22631.4541/23H2/2023Update/SunValley3)
AMD Ryzen 7 7840H w/ Radeon 780M Graphics, 1 CPU, 16 logical and 8 physical cores
.NET SDK 8.0.403
   : .NET 8.0.10 (8.0.1024.46610), X64 RyuJIT AVX-512F+CD+BW+DQ+VL+VBMI
DefaultJob : .NET 8.0.10 (8.0.1024.46610), X64 RyuJIT AVX-512F+CD+BW+DQ+VL+VBMI


| Method             | Width | Mean         | Error      | StdDev   | Ratio | RatioSD |
|------------------- |------ |-------------:|-----------:|-----------:|------:|--------:|
| Scalar             | 1024|1,077.72 us |20.704 us |24.647 us |1.00 |    0.03 |
| ScalarParallel   | 1024|    177.58 us |   3.489 us |   3.263 us |0.16 |    0.00 |
| UseVectors         | 1024|   79.40 us |   1.549 us |   2.713 us |0.07 |    0.00 |
| UseVectorsParallel | 1024|   19.54 us |   0.373 us |   0.547 us |0.02 |    0.00 |
| UseCopy            | 1024|   81.88 us |   1.608 us |   2.034 us |0.08 |    0.00 |
| UseCopyParallel    | 1024|   18.28 us |   0.357 us |   0.351 us |0.02 |    0.00 |
|                  |       |            |            |            |       |         |
| Scalar             | 2048|4,360.82 us |52.264 us |48.888 us |1.00 |    0.02 |
| ScalarParallel   | 2048|    717.40 us |13.745 us |13.499 us |0.16 |    0.00 |
| UseVectors         | 2048|    992.42 us |19.805 us |57.457 us |0.23 |    0.01 |
| UseVectorsParallel | 2048|    409.04 us |   8.070 us |19.022 us |0.09 |    0.00 |
| UseCopy            | 2048|1,002.18 us |19.600 us |27.476 us |0.23 |    0.01 |
| UseCopyParallel    | 2048|    418.30 us |   6.980 us |   5.449 us |0.10 |    0.00 |
|                  |       |            |            |            |       |         |
| Scalar             | 4096| 16,913.07 us | 244.574 us | 216.808 us |1.00 |    0.02 |
| ScalarParallel   | 4096|3,844.09 us |46.626 us |43.614 us |0.23 |    0.00 |
| UseVectors         | 4096|4,419.30 us |84.049 us |78.620 us |0.26 |    0.01 |
| UseVectorsParallel | 4096|4,000.12 us |44.611 us |39.546 us |0.24 |    0.00 |
| UseCopy            | 4096|4,608.49 us |33.594 us |31.424 us |0.27 |    0.00 |
| UseCopyParallel    | 4096|3,960.86 us |47.334 us |44.276 us |0.23 |    0.00 |

[*]Scalar: 标量算法。
[*]ScalarParallel: 并发的标量算法。
[*]UseVectors: 向量算法。
[*]UseVectorsParallel: 并发的向量算法。
[*]UseCopy: 使用“Buffer.MemoryCopy”的算法。
[*]UseCopyParallel: 并发的使用“Buffer.MemoryCopy”的算法。
5.2 Arm 架构

同样的源代码可以在 Arm 架构上运行。基准测试结果如下。
BenchmarkDotNet v0.14.0, macOS Sequoia 15.0.1 (24A348)
Apple M2, 1 CPU, 8 logical and 8 physical cores
.NET SDK 8.0.204
   : .NET 8.0.4 (8.0.424.16909), Arm64 RyuJIT AdvSIMD
DefaultJob : .NET 8.0.4 (8.0.424.16909), Arm64 RyuJIT AdvSIMD


| Method             | Width | Mean         | Error      | StdDev    | Ratio | RatioSD |
|------------------- |------ |-------------:|-----------:|----------:|------:|--------:|
| Scalar             | 1024|1,227.25 us |   0.694 us |0.649 us |1.00 |    0.00 |
| ScalarParallel   | 1024|    261.38 us |   0.739 us |0.617 us |0.21 |    0.00 |
| UseVectors         | 1024|    117.96 us |   0.105 us |0.098 us |0.10 |    0.00 |
| UseVectorsParallel | 1024|   39.46 us |   0.297 us |0.263 us |0.03 |    0.00 |
| UseCopy            | 1024|   92.95 us |   0.081 us |0.063 us |0.08 |    0.00 |
| UseCopyParallel    | 1024|   34.90 us |   0.170 us |0.159 us |0.03 |    0.00 |
|                  |       |            |            |         |       |         |
| Scalar             | 2048|5,236.47 us |69.941 us | 62.001 us |1.00 |    0.02 |
| ScalarParallel   | 2048|    952.35 us |   3.270 us |3.059 us |0.18 |    0.00 |
| UseVectors         | 2048|    700.91 us |   4.339 us |4.058 us |0.13 |    0.00 |
| UseVectorsParallel | 2048|    254.35 us |   1.183 us |1.107 us |0.05 |    0.00 |
| UseCopy            | 2048|    757.75 us |14.775 us | 25.485 us |0.14 |    0.01 |
| UseCopyParallel    | 2048|    252.87 us |   1.078 us |1.009 us |0.05 |    0.00 |
|                  |       |            |            |         |       |         |
| Scalar             | 4096| 20,257.16 us | 100.815 us | 84.185 us |1.00 |    0.01 |
| ScalarParallel   | 4096|3,728.60 us |12.672 us | 11.233 us |0.18 |    0.00 |
| UseVectors         | 4096|2,788.68 us |   2.712 us |2.404 us |0.14 |    0.00 |
| UseVectorsParallel | 4096|1,776.71 us |   1.510 us |1.412 us |0.09 |    0.00 |
| UseCopy            | 4096|2,448.65 us |   4.232 us |3.959 us |0.12 |    0.00 |
| UseCopyParallel    | 4096|1,796.17 us |   5.197 us |4.861 us |0.09 |    0.00 |5.3 .NET Framework

同样的源代码可以在 .NET Framework 上运行。基准测试结果如下。
BenchmarkDotNet v0.14.0, Windows 11 (10.0.22631.4541/23H2/2023Update/SunValley3)
AMD Ryzen 7 7840H w/ Radeon 780M Graphics, 1 CPU, 16 logical and 8 physical cores
   : .NET Framework 4.8.1 (4.8.9282.0), X64 RyuJIT VectorSize=256
DefaultJob : .NET Framework 4.8.1 (4.8.9282.0), X64 RyuJIT VectorSize=256


| Method             | Width | Mean         | Error      | StdDev   | Ratio | RatioSD | Code Size |
|------------------- |------ |-------------:|-----------:|-----------:|------:|--------:|----------:|
| Scalar             | 1024|1,062.91 us |14.426 us |12.788 us |1.00 |    0.02 |   2,891 B |
| ScalarParallel   | 1024|    183.82 us |   3.609 us |   4.296 us |0.17 |    0.00 |   2,894 B |
| UseVectors         | 1024|   71.65 us |   1.420 us |   1.328 us |0.07 |    0.00 |   3,602 B |
| UseVectorsParallel | 1024|   24.67 us |   0.471 us |   0.579 us |0.02 |    0.00 |   3,605 B |
| UseCopy            | 1024|   82.86 us |   1.653 us |   2.262 us |0.08 |    0.00 |   3,280 B |
| UseCopyParallel    | 1024|   24.16 us |   0.481 us |   0.659 us |0.02 |    0.00 |   3,283 B |
|                  |       |            |            |            |       |         |         |
| Scalar             | 2048|4,344.08 us |68.246 us |60.498 us |1.00 |    0.02 |   2,891 B |
| ScalarParallel   | 2048|    681.94 us |12.532 us |11.722 us |0.16 |    0.00 |   2,894 B |
| UseVectors         | 2048|    981.58 us |14.816 us |13.134 us |0.23 |    0.00 |   3,602 B |
| UseVectorsParallel | 2048|    429.28 us |   8.360 us |16.106 us |0.10 |    0.00 |   3,605 B |
| UseCopy            | 2048|    978.79 us |15.720 us |13.127 us |0.23 |    0.00 |   3,280 B |
| UseCopyParallel    | 2048|    438.06 us |   8.691 us |15.672 us |0.10 |    0.00 |   3,283 B |
|                  |       |            |            |            |       |         |         |
| Scalar             | 4096| 17,306.43 us | 343.417 us | 352.664 us |1.00 |    0.03 |   2,891 B |
| ScalarParallel   | 4096|3,717.65 us |18.424 us |17.233 us |0.21 |    0.00 |   2,894 B |
| UseVectors         | 4096|4,451.39 us |84.848 us |87.132 us |0.26 |    0.01 |   3,602 B |
| UseVectorsParallel | 4096|3,818.66 us |24.223 us |22.658 us |0.22 |    0.00 |   3,605 B |
| UseCopy            | 4096|4,721.90 us |88.960 us |83.214 us |0.27 |    0.01 |   3,280 B |
| UseCopyParallel    | 4096|3,820.63 us |19.312 us |18.065 us |0.22 |    0.00 |   3,283 B |六、并行处理收益极少的原因分析

6.1 观察测试结果,产生疑问

观察测试结果,首先能发现这2点情况——

[*]UseVectors与UseCopy的测试结果非常接近,且 UseVectorsParallel的测试结果非常接近。这是因为“Buffer.MemoryCopy”内部也使用了向量算法来进行优化。
[*]对于向量算法(UseVectors、UseCopy等)来说,.NET Framework 与 .NET 8.0 的测试结果非常接近。这是因为垂直翻转仅需要做复制操作,这是非常简单的操作。于是早在 .NET Framework 时代,“Buffer.MemoryCopy”已做好了向量化优化。
随后会发现一个奇怪的地方——并行版算法(Parallel)并没有比单线程算法快多少。尤其是对于4096宽度的图片,16个线程并行处理时,仍需花费 90%左右的时间,并行处理的收益极少。
这很奇怪,因为垂直翻转是可以分解为各行去处理的,非常适合并行化。且仅需做简单的内存复制操作,没有复杂的算术运算。16个线程并行处理时,理论上仅需要 1/16 的时间。但测试结果却是 ——仍需花费 90%左右的时间。
再来看看一下Arm架构的测试结果,会发现也存在并行处理收益极少的现象。但是要稍微好一点,例如。

[*]UseCopy:2,448.65 us
[*]UseCopyParallel:1,796.17 us
1796.17 / 2448.65 ≈ 0.7335。即只需 73.35 % 的时间,但这也比理论值差了不少。
6.2 数据分析

在Width为 4096时,图像高度也是 4096,且使用的是 24位 图像。那么一个图像的总字节数是 4096*4096*3, 即 48MB 左右。
根据UseCopyParallel的结果,来计算一下每秒的处理速率——

[*]X86: 48 / 3.96086 ≈ 12.1185 (GB/s)
[*]Arm: 48 / 1.79617 ≈ 26.7235 (GB/s)
从上面的数据来看,Arm的吞吐率是相对于 X86的倍数是:26.7235/12.1185 ≈ 2.2052。
6.3 找到原因

我这台Arm计算机,CPU是“Appla M2”。查了资料,发现M2具有“100GB/s统一内存带宽”。
而我的X86电脑,用的是DDR5-5600内存条。DDR每次传输64位数据,故内存带宽为 5600 * 64 / 8 = 44800 (MB/s),即 44.8 GB/s。
我们再来计算一下倍数:100 / 44.8 ≈ 2.2321。
“2.2321”与“2.2052”非常接近。这表示现在遇到的是——内存带宽的瓶颈。
这表示在4096时,我们的算法已经基本占满了内存带宽。此时虽然CPU有10多个线程,理论上很多算力还没发挥出来,但由于内存带宽已占满了,故止步于这个值。
由于是“一读一写”,程序需要2次访问内存——

[*]X86: 12.1185*2 = 24.237 (GB/s) (理论上限是 44.8)
[*]Arm: 26.7235*2 = 53.447 (GB/s) (理论上限是 100)
可以看出,程序已经达到内存带宽理论上限的60%左右了。即我们的垂直翻转算法,已经是足够优秀了。
6.4 进一步优化的空间

其实程序是还可以进一步优化的,例如使用 地址对齐、预取、非时间性(NonTemporal)写入、循环展开 等手段。但这些手段比较复杂,编码难度大,更会使程序的可读性大为降低。而且这些手段是特定硬件型号敏感的,例如更换了CPU型号后,很多细节得重新调校。
更关键的是,进一步优化的空间已经非常少了,离理论上限只有40%左右差距。就算达到理论上限,性能也仅提高这 40%左右,没法翻倍。仅当硬件型号固定,且有充足理由时,此时才可考虑做进一步的优化处理。
这也给了我们一个提醒:在编写向量化算法时,为了避免遇到内存带宽瓶颈,程序应尽量少的读写内存。例如使用向量类型进行批量读取,随后尽量将数据在向量类型里处理好,最后使用向量类型进行批量写入。
因为在使用向量类型时,是在CPU内的向量寄存器(Register)中处理的。寄存器与CPU同频工作,吞吐率比内存高了几个数量级。寄存器不够用时,一般会挪到Cache(高速缓存)上进行周转,Cache的速度也比内存快很多。
附录


[*]完整源代码: https://github.com/zyl910/VectorTraits.Sample.Benchmarks/blob/main/VectorTraits.Sample.Benchmarks.Inc/Image/ImageFlipYBenchmark.cs
[*]VectorTraits 的NuGet包: https://www.nuget.org/packages/VectorTraits
[*]VectorTraits 的在线文档: https://zyl910.github.io/VectorTraits_doc/
[*]VectorTraits 源代码: https://github.com/zyl910/VectorTraits
    出处:http://www.cnblogs.com/zyl910/    版权声明:自由转载-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 3.0.
来源:https://www.cnblogs.com/zyl910/p/18577874/VectorTraits_Sample_Image_ImageFlipYBenchmark
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: [C#] 对图像进行垂直翻转(FlipY)的跨平台SIMD硬件加速向量算法,兼谈并行处