并发控制
对于短时间可能发送大量网络请求的场景,为了节约资源,需要进行请求的并发控制。设置最大并发数,当某个请求完成时,才发起新的请求:
/**
* 请求并发控制
* @param {*} requestPool 请求池 (是一个可迭代对象)
* @param {*} poolLimit 最大并发数
*/
/** es7 */
async function concurrencyControl(requestPool, poolLimit) {
/** 用于请求的结果 */
const ret = [];
/** 真正并发执行的请求集合 */
const executing = new Set();
for (const item of requestPool) {
/** 防止返回的不是promise,使用Promise.resolve */
const p = Promise.resolve().then(() => item());
/** 将正在请求的promise放入ret和executing中 */
ret.push(p);
executing.add(p);
/** 请求resolve 或 reject 执行 清除操作 */
const clean = () => executing.delete(p);
p.then(clean).catch(clean);
if (executing.size >= poolLimit) {
// 一旦正在执行的promise列表数量等于限制数,就使用Promise.race等待某一个promise状态发生变更,
// 状态变更后,就会执行上面then的回调,将该promise从executing中删除,
// 然后再进入到下一次for循环,生成新的promise进行补充
await Promise.race(excuting);
}
}
return Promise.all(ret);
}
/** es6 */
function concurrencyControlES6(requestPool, poolLimit) {
let i = 0;
const ret = [];
const executing = new Set();
const enqueue = function() {
if (i === requestPool.length) {
return Promise.resolve();
}
const item = iterable[i++];
const p = Promise.resolve().then(() => item());
ret.push(p);
executing.add(p);
const clean = () => executing.delete(p);
p.then(clean).catch(clean);
let r = Promise.resolve();
if (executing.size >= poolLimit) {
r = Promise.race(executing);
}
return r.then(() => enqueue());
};
return enqueue().then(() => Promise.all(ret));
}
async/await
是 ES7
的特性,用 ES6
也能实现相同的效果,而且ES6的方式的可以动态的添加新的请求:
function main() {
concurrentControlES6(requestPool, 2).then(res => console.log(res));
// 动态添加新请求
requestPool.push(newRequest());
}