翼度科技»论坛 编程开发 JavaScript 查看内容

控制请求并发数量:p-limit 源码解读

8

主题

8

帖子

24

积分

新手上路

Rank: 1

积分
24
p-limit 是一个控制请求并发数量的库,他的整体代码不多,思路挺好的,很有学习价值;
举例

当我们同时发起多个请求时,一般是这样做的
  1. Promise.all([
  2.     requestFn1,
  3.     requestFn2,
  4.     requestFn3
  5. ]).then(res =>{})
复制代码
或者
  1. requestFn1()
  2. requestFn2()
  3. requestFn3()
复制代码
而使用 p-limit 限制并发请求数量是这样做的:
  1. var limit = pLimit(8); // 设置最大并发数量为 8
  2. var input = [ // Limit函数包装各个请求
  3.     limit(() => fetchSomething('1')),
  4.     limit(() => fetchSomething('2')),
  5.     limit(() => fetchSomething('3')),
  6.     limit(() => fetchSomething('4')),
  7.     limit(() => fetchSomething('5')),
  8.     limit(() => fetchSomething('6')),
  9.     limit(() => fetchSomething('7')),
  10.     limit(() => fetchSomething('8')),
  11. ];
  12. // 执行请求
  13. Promise.all(input).then(res =>{
  14.     console.log(res)
  15. })
复制代码
上面 input 数组包含了 8 个 limit 函数,每个 limit 函数包含了要发起的请求
当设置最大并发数量为 8 时,上面 8 个请求会同时执行
来看下效果,假设每个请求执行时间为1s。
  1. var fetchSomething = (str) => {
  2.     return new Promise((resolve, reject) => {
  3.         setTimeout(() => {
  4.             console.log(str)
  5.             resolve(str)
  6.         }, 1000)
  7.     })
  8. }
复制代码
当设置并发请求数量为 2 时

当设置并发请求数量为 3 时

p-limit 限制并发请求数量本质上是,在内部维护了一个请求队列;
当请求发起时,先将请求推入队列,判断当前执行的请求数量是否小于配置的请求并发数量,如果是则执行当前请求,否则等待正在发起的请求中谁请求完了,再从队列首部取出一个执行;
源码(v2.3.0)

pLimit 源码如下(这个源码是 v2.3.0 版本的,因为项目中引入的版本比较早。后面会分析从 2.3.0 到最新版本的源码,看看增加或者改进了什么):
  1. 'use strict';
  2. const pTry = require('p-try');
  3. const pLimit = concurrency => {
  4.     // 限制为正整数
  5.     if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
  6.         return Promise.reject(new TypeError('Expected `concurrency` to be a number from 1 and up'));
  7.     }
  8.     const queue = []; // 请求队列
  9.     let activeCount = 0; // 当前并发的数量
  10.     const next = () => { // 一个请求完成时执行的回调
  11.         activeCount--;
  12.         if (queue.length > 0) {
  13.             queue.shift()();
  14.         }
  15.     };
  16.     const run = (fn, resolve, ...args) => { // 请求开始执行
  17.         activeCount++;
  18.         const result = pTry(fn, ...args);
  19.         resolve(result); // 将结果传递给 generator
  20.         result.then(next, next); // 请求执行完调用回调
  21.     };
  22.     // 将请求加入队列
  23.     const enqueue = (fn, resolve, ...args) => {
  24.         if (activeCount < concurrency) {
  25.             run(fn, resolve, ...args);
  26.         } else {
  27.             queue.push(run.bind(null, fn, resolve, ...args));
  28.         }
  29.     };
  30.     const generator = (fn, ...args) => new Promise(resolve => enqueue(fn, resolve, ...args));
  31.    
  32.     // 暴露内部属性给外界
  33.     Object.defineProperties(generator, {
  34.         activeCount: {
  35.             get: () => activeCount
  36.         },
  37.         pendingCount: {
  38.             get: () => queue.length
  39.         },
  40.         clearQueue: {
  41.             value: () => {
  42.                 queue.length = 0;
  43.             }
  44.         }
  45.     });
  46.     return generator;
  47. };
  48. module.exports = pLimit;
  49. module.exports.default = pLimit;
复制代码
下面一一剖析下
1、pLimit 函数整体是一个闭包函数,返回了一个名叫 generator 的函数,由 generator 处理并发逻辑,
generator 返回值必须是 promise,这样才能被 Promise.all 捕获到
  1. const generator = (fn,...args) => new Promise((resolve,reject)=7enqueue(fn,resolve,...args))
复制代码
2、在 enqueue 函数里面
  1. // 将请求加入队列
  2. const enqueue = (fn, resolve, ...args) => {
  3.     if (activeCount < concurrency) {
  4.         run(fn, resolve, ...args);
  5.     } else {
  6.         queue.push(run.bind(null, fn, resolve, ...args));
  7.     }
  8. };
复制代码
activeCount 表示正在执行的请求数量,当 activeCount 小于配置的并发数量(concurrency)时,则可以执行当前的 fn(执行 run 函数),否则推入请求队列等待。
3、run 函数接收了三个形参
  1. const run = (fn, resolve, ...args) => { // 请求开始执行
  2.     activeCount++;
  3.     const result = pTry(fn, ...args);
  4.     resolve(result);
  5.     result.then(next, next);
  6. };
复制代码

  • fn 表示执行的请求,
  • resolve 由 generator 定义并往下传,一直跟踪到请求执行完毕后,调用 resolve(result); 代表 generator 函数 fulfilled
  • ···args 表示其余的参数,最终会作为 fn 的参数。
4、执行 run 函数时
  1. const run = (fn, resolve, ...args) => { // 请求开始执行
  2.     activeCount++; // 请求开始执行,当前请求数量 +1
  3.     const result = pTry(fn, ...args);
  4.     resolve(result);
  5.     result.then(next, next);
  6. };
复制代码
这里执行 fn 使用的是 const result = pTry(fn,...args), pTry 的作用就是创建一个 promise 包裹的结果,不论 fn 是同步函数还是异步函数
  1. // pTry 源码
  2. const pTry = (fn,...args) => new Promise((resolve,reject) => resolve(fn(...args)));
复制代码
现在 fn 执行(fn(...args))完毕并兑现(resolve(fn(...args)))之后,result 就会兑现。
result 兑现后,generator 的 promise 也就兑现了( resolve(result) ),那么当前请求 fn 的流程就执行完了。
5、当前请求执行完后,对应的当前正在请求的数量也要减一,activeCount--
  1. const next = () => { // 一个请求完成时执行的回调
  2.     activeCount--;
  3.     if (queue.length > 0) {
  4.         queue.shift()();
  5.     }
  6. };
复制代码
然后继续从队列头部取出请求来执行
6、最后暴露内部属性给外界
  1. Object.defineProperties(generator, {
  2.     activeCount: { // 当前正在请求的数量
  3.         get: () => activeCount
  4.     },
  5.     pendingCount: { // 等待执行的数量
  6.         get: () => queue.length
  7.     },
  8.     clearQueue: {
  9.         value: () => {
  10.             queue.length = 0;
  11.         }
  12.     }
  13. });
复制代码
源码(v2.3.0)=> 源码(v6.1.0)

v2.3.0 到最新的 v6.1.0 版本中间加了一些改进
1、v3.0.0:始终异步执行传进 limit 的函数


在 3.0.0 中,作者将请求入队放在前面,将 if 判断语句和请求执行置于微任务中运行;正如源码注释中解释的:因为当 run 函数执行时,activeCount 是异步更新的,那么这里的 if 判断语句也应该异步执行才能实时获取到 activeCount 的值。
这样一开始批量执行 limit(fn) 时,将会先把这些请求全部放入队列中,然后再根据条件判断是否执行请求;
2、v3.0.2:修复传入的无效并发数引起的错误;


将 return Promise.reject 改为了直接 throw 一个错误
3、v3.1.0:移除 pTry 的依赖;改善性能;


移除了 pTry 依赖,改为了 async 包裹,上面有提到,pTry 是一个 promise 包装函数,返回结果是一个 promise;两者本质都是一样;
增加了 yocto-queue 依赖,yocto-queue是一个队列数据结构,用队列代替数组,性能更好;队列的入队和出队操作时间复杂度是 O(1),而数组的 shift() 是 O(n);
4、v5.0.0:修复上下文传播问题


引入了 AsyncResource
  1. export const AsyncResource = {
  2.     bind(fn, _type, thisArg) {
  3.         return fn.bind(thisArg);
  4.     }
  5. }
复制代码
这里用 AsyncResource.bind() 包裹 run.bind(undefined, fn, resolve, args) ,其实不是太明白为啥加这一层。。。这里用的到三个参数(fn,resolve,args)都是通过函数传参过来的,和 this 没关系吧,各位知道的可以告知下么。
相关 issue 在这里
5、6.0.0:性能优化,主要优化的地方在下面


移除了 AsyncResource.bind(),改为使用一个立即执行的 promise,并将 promise 的 resolve 方法插入队列,一旦 resolve 完成兑现,调用相应请求;相关 issue 在这里
6、v6.1.0:允许实时修改并发限制数


改变并发数后立马再检测是否可以执行请求;
最后

在上面第4点的,第5点中的优化没太看明白,因为执行请求用的到三个参数(fn,resolve,args)都是通过函数传参过来的,看起来 this 没关系,为啥要进行多层 bind 绑定呢?各位知道的可以不吝赐教下么。

来源:https://www.cnblogs.com/zsxblog/p/18426066
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具