翼度科技»论坛 编程开发 JavaScript 查看内容

手写 p-map(控制并发数以及迭代处理 promise 的库)

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
介绍

p-map 是一个迭代处理 promise 并且能控制 promise 执行并发数的库。作者是 sindresorhus,他还创建了许多关于 promise 的库 promise-fun,感兴趣的同学可以去看看。
之前 提到的 p-limit 也是一个控制请求并发数的库,控制并发数方面,两者作用相同,不过 p-map 增加了对请求(promise)的迭代处理。
之前 p-limit 的用法如下,limit 接受一个函数;
  1. var limit = pLimit(8); // 设置最大并发数量为 2
  2. var input = [ // Limit函数包装各个请求
  3.     limit(() => fetchSomething('1')),
  4.     limit(() => fetchSomething('2')),
  5.     limit(() => fetchSomething('3')),
  6.     limit(() => fetchSomething('4'))
  7. ];
  8. // 执行请求
  9. Promise.all(input).then(res =>{
  10.     console.log(res)
  11. })
复制代码
而 p-map 则通过用户传进来的 mapper 处理函数处理的一个集合(准确的说是一个可迭代对象);
  1. import pMap from 'p-map';
  2. import got from 'got';
  3. const sites = [
  4.         getWebsiteFromUsername('sindresorhus'), //=> Promise
  5.         'https://avajs.dev',
  6.         'https://github.com'
  7. ];
  8. const mapper = async site => {
  9.         const {requestUrl} = await got.head(site);
  10.         return requestUrl;
  11. };
  12. // 接收三个参数,一个是可迭代对象,一个是对可迭代对象进行处理的函数,一个是配置选项
  13. const result = await pMap(sites, mapper, {concurrency: 2});
  14. console.log(result);
  15. //=> ['https://sindresorhus.com/', 'https://avajs.dev/', 'https://github.com/']
复制代码
默认的可迭代对象有StringArrayTypedArrayMapSet 、Intl.Segments,而要成为可迭代对象,该对象必须实现 [Symbol.iterator]() 方法;
遍历可迭代对象时,实际上是根据迭代器协议进行遍历;
比如,迭代一个数组是这样的
  1. var iterable = [1,2,3,4]
  2. var iterator = iterable[Symbol.iterator]() // iterator 是迭代器
  3. iterator.next() // {value: 1, done: false}
  4. iterator.next() // {value: 2, done: false}
  5. iterator.next() // {value: 3, done: false}
  6. iterator.next() // {value: 4, done: false}
  7. iterator.next() // {value: undefined, done: true}
复制代码
当数组迭代完成后,会返回 {value: undefined, done: true}
p-pap 控制并发请求的原理是,对传进来的集合进行迭代,当集合第一个元素(元素可能是异步函数)执行完后,会交给 mapper 函数处理,mapper 处理完后,才开始迭代下一个元素,这样就保持了按照顺序一个个迭代,此时并发数是1;
要做到并发是n,并且还能执行上面的迭代,作者很巧妙的用了 for 循环
  1. for(let i=0;i<n;i++)
  2.     next()
  3. }
复制代码
集合第一个元素和第四个元素是一个 promise 函数
p-map 接收三个参数,分别是要迭代的对象,mapper 处理函数,自定义配置;返回值是 promise, 如下面所示
  1. var fetchSomething = (str,ms) =>{
  2.     return new Promise((resolve,reject) =>{
  3.         setTimeout(() =>{
  4.             resolve(parseFloat(str))
  5.         },ms)
  6.     })
  7. }
  8. var arr= [
  9.     fetchSomething('1a' ,1000), // promise
  10.     2,3,
  11.     fetchSomething( '4a' , 5000), // promise
  12.     5,6,7,8,9,10
  13. ]
复制代码
拿到可迭代对象后,对它进行递归迭代,直至迭代完毕;这里定义一个内部递归迭代函数 next
  1. var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {});
复制代码
迭代对象中每个元素都是按顺序迭代的;如果元素是异步函数时,需要先等异步函数兑现,并且兑现后的值传给 mapper 函数,等到 mapper 函数兑现或者拒绝后才继续迭代下一个元素
  1. var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {
  2.     var iterator = iterable[Symbol.iterator]()
  3.     var next= ()=>{
  4.         var item=iterator.next()
  5.         if(item.done){
  6.             return
  7.         }
  8.         next()
  9.     }
  10.    
  11. });
复制代码
并且每次迭代彻底完成后保存兑现的结果
  1. var iterator = iterable[Symbol.iterator]()
  2. var next = () => {
  3.     var item = iterator.next()
  4.     if (item.done) {
  5.         return
  6.     }
  7.     Promise.resolve(item.value)
  8.         .then(res => mapper(res))
  9.         .then(res2 => {
  10.             next()
  11.         })
  12. }
复制代码
当整个迭代完后,并且元素全部执行(兑现)完,输出结果集
  1. var iterator = iterable[Symbol.iterator]()
  2. var index = 0 // 序号,根据可迭代对象的顺序保存结果
  3. var ret = []// 保存结果
  4. var next = () => {
  5.     var item = iterator.next()
  6.     if (item.done) {
  7.         return
  8.     }
  9.     var currentIndex = index //保存当前元素序号,用于存入结果
  10.     index++ //下一个元素的序号
  11.     Promise.resolve(item.value)
  12.         .then(res => mapper(res))
  13.         .then(res2 => {
  14.             ret[currentIndex] = res2
  15.             next()
  16.         })
  17. }
复制代码
配置项 stopOnError

传入 p-map 配置项中有一个参数是 stopOnError,表示当执行遇到错误,是否终止迭代循环,所以这里在 .catch() 里面做判断;
  1. var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {
  2.     var activeCount = 0 //正在执行的元素个数
  3.     var next = () => {
  4.         var item = iterator.next()
  5.         if (item.done) {  //元素全部迭代完
  6.             if (activeCount == 0) {
  7.                 resolve(ret)  //元素全部执行(兑现)完,输出结果集
  8.             }
  9.             return
  10.         }
  11.         var currentIndex = index // 保存当前元素序号,用干存入结果
  12.         index++ //下一个元素的序号
  13.         activeCount++
  14.         Promise.resolve(item.value)
  15.             .then(res => mapper(res))
  16.             .then(res2 => {
  17.                 ret[currentIndex] = res2
  18.                 activeCount--
  19.                 next()
  20.             })
  21.             .catch(err => {
  22.                 activeCount--
  23.             })
  24.     }
  25. })
复制代码
忽略错误执行结果 pMapSkip

mapper 函数是用户自定义的, 如果 mapper 执行错误,用户期望忽略错误执行结果,只保留正确结果,这该怎么做呢?,此时 pMapSkip 就登场了;
p-map 源码中提供了 pMapSkip,pMap5kip 是一个 Symbol 值,p-map 内部处理则是:当结果集收到的结果是 pMapSkip,则会在迭代完成后清除返回值是 pMapSkip 的元素,也就是说 mapper 处理时发生错误, 用户不想要这个值,可以 reject(pMapSkip) 比如:
  1. Promise.resolve(item.value)
  2.     .then(res = mapper(res))
  3.     .then(res2 => {
  4.         // ...
  5.     })
  6.     .catch(err => {
  7.         ret[currentIndex] == err // 将错误的结果也保存起来
  8.         
  9.         if (stopOnError) {
  10.             hasError = true
  11.             reject(err) // 发生错误,终止循环
  12.         }else {
  13.             hasError = false
  14.             activeCount--
  15.             next()
  16.         }
  17.     }
复制代码
所以当 mapper 返回 pMapSkip 时,需要标记对应的元素
  1. import pMap, { pMapSkip } from 'p-map'
  2. var arr = [
  3.     fetchSomething('1a', 1000, true),
  4.     2, 3
  5. ]
  6. var mapper = (item, index) => {
  7.     return new Promise((resolve, reject) => {
  8.         return item == 2 ? reject(pMapSkip): resolve(parseFloat(item)) // 元素是 2 ,抛出错误
  9.     })
  10. }
  11. (async () => {
  12.     const result = await pMap(arr, mapper, { concurrency: 2 });
  13.     console.log(result); //=>[1,3]
  14. })();
复制代码
记录需要剔除的元素的位置
  1. var skipIndexArr= []
复制代码
并且在迭代结束时剔除结果集中的有 pMapSkip 的元素
  1. var skipIndexArr = [];
  2. Promise.resolve(item.value)
  3.   .then((res = mapper(res)))
  4.   .then((res2) => {
  5.     // ...
  6.   })
  7.   .catch((err) => {
  8.     if (err === pMapSkip) {
  9.       skipIndexArr.push(currentIndex); //记录需要剔除的元素的位置
  10.     } else {
  11.       ret[currentIndex] == err;
  12.       if (stopOnError) {
  13.         hasError = true;
  14.         reject(err);
  15.       } else {
  16.         hasError = false;
  17.         activeCount--;
  18.         next();
  19.       }
  20.     }
  21.   });
复制代码
在数据里大的情况下,频繁使用 splice 性能可能没那么好,因为执行 splice 后,其后的元素的索引都会改变;那么就要改造下,将 skipIndexArr 改为 Map 形式。
  1. // import pMap, { pMapSkip } from 'p-map'
  2. var arr = [
  3.     fetchSomething('1a', 1000, true),
  4.     2, 3
  5. ]
  6. var mapper = (item, index) => {
  7.     return new Promise((resolve, reject) => {
  8.         return item == 2 ? reject(pMapSkip): resolve(parseFloat(item)) // 元素是 2 ,抛出错误
  9.     })
  10. }
  11. (async () => {
  12.     const result = await pMap(arr, mapper, { concurrency: 2 });
  13.     console.log(result); //=>[1,3]
  14. })();var skipIndexArr= new Map()
复制代码
记录需要删除的元素的位置
  1. // var skipIndexArr= []
  2. var skipIndexArr= new Map()
复制代码
然后迭代结束时,不再在原数组里面 splice ,改为用新数组接收;push 比 splice 性能好;
  1. if (err === pMapSkip) {
  2.   skipIndexArr.set(currentIndex, err);
  3. }
复制代码
在外部取消 p-map 的请求或者取消迭代: AbortController

存在某些情况,当我们不再需要 p-map 返回的结果,或者不再想要使用 p-map 时,我们就需要在外部取消 p-map 的请求或者取消迭代,这时就可以使用 AbortController;
简单介绍下 AbortController 的用法,有一个请求 fetchSomething
  1. if (item.done) {
  2.   if (activeCount == 0) {
  3.     if (skipIndexArr.size === 0) {
  4.       resolve(ret);
  5.       return;
  6.     }
  7.     const pureRet = [];
  8.     for (const [index, value] of ret.entries()) {
  9.       if (skipIndexArr.get(index) === pMapSkip) {
  10.         continue;
  11.       }
  12.       pureRet.push(value);
  13.     }
  14.     resolve(pureRet);
  15.   }
  16.   return;
  17. }
复制代码
想要取消 fetchSomething 请求,就需要传一个 signal 到里面;signal 是 AbortController 的实例属性;AbortController 和请求之间就是由 signal 建立关联;
  1. var fetchSomething = (str) => {
  2.     return new Promise((resolve, reject) => {
  3.         setTimeout(() => {
  4.             resolve(str)
  5.         }, 1000)
  6.     })
  7. }
复制代码
建立关联后,外部取消请求使用的是 AbortController 实例方法 var controller = new AbortController()
var signal = controller.signal

var fetchSomething = (str,signal) => {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            resolve(str)
        }, 1000)
    })
}

fetchSomething('fetch',signal).then(res => {
    console.log('res:', res)
}).catch(err => {
    console.log('err:', err)
});
  1. var controller = new AbortController()
  2. var signal = controller.signal
  3. var fetchSomething = (str,signal) => {
  4.     return new Promise((resolve, reject) => {
  5.         setTimeout(() => {
  6.             resolve(str)
  7.         }, 1000)
  8.     })
  9. }
  10. fetchSomething('fetch',signal).then(res => {
  11.     console.log('res:', res)
  12. }).catch(err => {
  13.     console.log('err:', err)
  14. })
复制代码
然后在请求里面监听外部是否调用了 var controller = new AbortController()
var signal = controller.signal

var fetchSomething = (str,signal) => {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            resolve(str)
        }, 1000)
    })
}

fetchSomething('fetch',signal).then(res => {
    console.log('res:', res)
}).catch(err => {
    console.log('err:', err)
});有两种方式
  1. controller.abort()
复制代码
完整示例:
  1. var controller = new AbortController()var signal = controller.signalvar fetchSomething = (str,signal) => {    return new Promise((resolve, reject) => {        signal.addEventListener('abort', () => {            console.log(' addEventListener')            reject('addEventListener取消')        }, false)        setTimeout(() => {            if (signal.aborted) {                console.log('aborted')                reject('aborted取消')                return            }            console.log('进入setTimeout')            resolve(str)        }, 1000)    })}setTimeout(() => {    var controller = new AbortController()
  2. var signal = controller.signal
  3. var fetchSomething = (str,signal) => {
  4.     return new Promise((resolve, reject) => {
  5.         setTimeout(() => {
  6.             resolve(str)
  7.         }, 1000)
  8.     })
  9. }
  10. fetchSomething('fetch',signal).then(res => {
  11.     console.log('res:', res)
  12. }).catch(err => {
  13.     console.log('err:', err)
  14. })}, 500)fetchSomething('fetch',signal).then(res => {    console.log('res:', res)}).catch(err => {    console.log('err:', err)})
复制代码
500ms 后输出:
  1. var controller = new AbortController()
  2. var signal = controller.signal
  3. var fetchSomething = (str,signal) => {
  4.     return new Promise((resolve, reject) => {
  5.         signal.addEventListener('abort', () => {
  6.             console.log(' addEventListener')
  7.             reject('addEventListener取消')
  8.         }, false)
  9.         setTimeout(() => {
  10.             if (signal.aborted) {
  11.                 console.log('aborted')
  12.                 reject('aborted取消')
  13.                 return
  14.             }
  15.             console.log('进入setTimeout')
  16.             resolve(str)
  17.         }, 1000)
  18.     })
  19. }
  20. setTimeout(() => {
  21.     controller.abort()
  22. }, 500)
  23. fetchSomething('fetch',signal).then(res => {
  24.     console.log('res:', res)
  25. }).catch(err => {
  26.     console.log('err:', err)
  27. })
复制代码
结合 p-map 使用如下:
  1. addEventListener
  2. err: addEventListener取消
  3. aborted
复制代码
那么 p-map 内部实现就好写了:
  1. import pMap from 'p-map';
  2. const abortController = new AbortController();
  3. setTimeout(() => {
  4.         abortController.abort();
  5. }, 500);
  6. const mapper = async value => value;
  7. await pMap([fetchSomething(1000), fetchSomething(1000)], mapper, {signal: abortController.signal});
  8. // 500 ms 结束 pMap 方法,抛出错误信息.
复制代码
手写的完整源码:

下面的源码也可以把 promise.then() 和 .catch() 写法改进为 async await + try catch 写法;
  1. var pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
  2.     var { signal } = options
  3.     if (signal) {
  4.         if (signal.aborted) {
  5.             reject(signal.reason);
  6.         }
  7.         signal.addEventListener('abort', () => {
  8.             reject(signal.reason);
  9.         });
  10.     }
  11.     var next = () =>{
  12.         //...
  13.     }
  14. })
复制代码
测试一下

1、测试 pMapSkip
  1. let pMapSkip = Symbol('skip')
  2. var pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
  3.     var iterator = iterable[Symbol.iterator]()
  4.     var index = 0 // 序号,根据可迭代对象的顺序保存结果
  5.     var ret = []// 保存结果
  6.     var activeCount = 0 //正在执行的元素个数
  7.     var isIterableDone = false
  8.     var hasError = false
  9.     var skipIndexArr = new Map()
  10.     var { signal, stopOnError, concurrency } = options
  11.     if (signal) {
  12.         if (signal.aborted) {
  13.             reject(signal.reason);
  14.         }
  15.         signal.addEventListener('abort', () => {
  16.             reject(signal.reason);
  17.         });
  18.     }
  19.     var next = () => {
  20.         var item = iterator.next()
  21.         if (item.done) {
  22.             isIterableDone = true
  23.             if (activeCount == 0) {
  24.                 if (skipIndexArr.size === 0) {
  25.                     resolve(ret);
  26.                     return;
  27.                 }
  28.                 const pureRet = [];
  29.                 for (const [index, value] of ret.entries()) {
  30.                     if (skipIndexArr.get(index) === pMapSkip) {
  31.                         continue;
  32.                     }
  33.                     pureRet.push(value);
  34.                 }
  35.                 resolve(pureRet);
  36.             }
  37.             return;
  38.         }
  39.         var currentIndex = index // 保存当前元素序号,用干存入结果
  40.         index++ //下一个元素的序号
  41.         activeCount++
  42.         Promise.resolve(item.value)
  43.             .then(res => mapper(res))
  44.             .then(res2 => {
  45.                 ret[currentIndex] = res2
  46.                 activeCount--
  47.                 next()
  48.             })
  49.             .catch(err => {
  50.                 ret[currentIndex] == err;
  51.                 if (stopOnError) {
  52.                     hasError = true;
  53.                     reject(err);
  54.                 } else {
  55.                     ret[currentIndex] == err;
  56.                     if (err === pMapSkip) {
  57.                         skipIndexArr.set(currentIndex, err);
  58.                     }
  59.                     hasError = false;
  60.                     activeCount--;
  61.                     next();
  62.                 }
  63.             })
  64.     }
  65.     for (let k = 0; k < concurrency; k++) {
  66.         if (isIterableDone) {
  67.             break
  68.         }
  69.         next()
  70.     }
  71. })
复制代码
2、测试中止请求
  1. var controller = new AbortController()var signal = controller.signalpMap(arr, mapper, { concurrency: 2,signal:signal }).then(res => {    console.log(res)}).catch(err =>{    console.log(err) // 500ms 后打印 AbortError: signal is aborted without reason})setTimeout(() =>{    var controller = new AbortController()
  2. var signal = controller.signal
  3. var fetchSomething = (str,signal) => {
  4.     return new Promise((resolve, reject) => {
  5.         setTimeout(() => {
  6.             resolve(str)
  7.         }, 1000)
  8.     })
  9. }
  10. fetchSomething('fetch',signal).then(res => {
  11.     console.log('res:', res)
  12. }).catch(err => {
  13.     console.log('err:', err)
  14. })},500)
复制代码
3、测试 stopOnError
  1. var controller = new AbortController()
  2. var signal = controller.signal
  3. pMap(arr, mapper, { concurrency: 2,signal:signal }).then(res => {
  4.     console.log(res)
  5. }).catch(err =>{
  6.     console.log(err) // 500ms 后打印 AbortError: signal is aborted without reason
  7. })
  8. setTimeout(() =>{
  9.     controller.abort()
  10. },500)
复制代码
至此,p-map 核心功能实现完了;感兴趣的同学可以点点赞;

来源:https://www.cnblogs.com/zsxblog/p/18453161
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具