 并行限制的Promise
          并行限制的Promise
        
 在 JavaScript 中实现 并行限制的 Promise(即控制同时运行的异步任务数量)主要有以下几种方式:
# 一、基于调度器类(Scheduler)的队列管理
核心思想:通过维护任务队列和当前运行任务计数器,动态调度新任务。
 实现步骤:
- 定义 Scheduler类,包含任务队列、最大并发数、当前运行任务计数器。
- 通过 add方法添加任务,触发任务调度。
- 使用递归或循环,在任务完成时从队列中取出下一个任务执行。
class Scheduler {
  constructor(max) {
    this.max = max;
    this.queue = [];
    this.running = 0;
  }
  add(promiseCreator) {
    this.queue.push(promiseCreator);
    this.run();
  }
  run() {
    while (this.running < this.max && this.queue.length) {
      const task = this.queue.shift();
      this.running++;
      task().finally(() => {
        this.running--;
        this.run();
      });
    }
  }
}
特点:
- 灵活控制队列,支持动态添加任务。
- 每个任务完成后自动触发下一个任务(类似“接力”机制)。
函数实现
function createScheduler(max) {
  let queue = [];
  let running = 0;
  function run() {
    while (running < max && queue.length) {
      const task = queue.shift();
      running++;
      task().finally(() => {
        running--;
        run();
      });
    }
  }
  return function (promiseCreator) {
    queue.push(promiseCreator);
    run();
  };
}
改成两个参数的版本
// 版本1:基础执行版(不收集结果)
function concurrentRun(limit = 2, promises) {
  let queue = [...promises];
  let running = 0;
  function runner() {
    while (running < limit && queue.length) {
      const task = queue.shift();
      running++;
      task().finally(() => {
        running--;
        runner();
      });
    }
  }
  runner();
}
// 版本2:Promise 结果收集版
function concurrentRunWithResults(limit, promises) {
  return new Promise((resolve) => {
    const results = new Array(promises.length);
    let queue = promises.map((fn, i) => ({ fn, index: i }));
    let running = 0;
    let completed = 0;
    async function runner() {
      while (running < limit && queue.length) {
        const { fn, index } = queue.shift();
        running++;
        
        try {
          results[index] = await fn();
        } catch (e) {
          results[index] = e;
        } finally {
          running--;
          completed++;
          
          if (completed === promises.length) {
            resolve(results);
          } else {
            runner();
          }
        }
      }
    }
    runner();
  });
}

# 二、递归 + 分批并发(Promise.all 分片)
核心思想:将任务数组按批次切割,每批用 Promise.all 执行,递归处理剩余批次。
 实现步骤:
- 将任务数组按 max值分片。
- 对每片任务执行 Promise.all,递归处理下一片。
代码示例(参考 [1] [5]):
async function batchRun(tasks, max) {
  let results = [];
  for (let i = 0; i < tasks.length; i += max) {
    const batch = tasks.slice(i, i + max);
    results = results.concat(await Promise.all(batch.map(fn => fn())));
  }
  return results;
}
特点:
- 实现简单,但无法动态调整并发数。
- 任务必须预先全部加载,无法中途添加。
# 三、Promise.race 动态控制并发池
核心思想:利用 Promise.race 监听并发池中的任务完成事件,动态替换新任务。
 实现步骤:
- 初始化并发池(数组),填充初始任务。
- 使用 Promise.race监听池内任务,任一完成则替换新任务。
代码示例(参考 [5] [8]):
async function parallelLimit(tasks, max) {
  const pool = new Set();
  for (const task of tasks) {
    const promise = task().then(() => pool.delete(promise));
    pool.add(promise);
    if (pool.size >= max) await Promise.race(pool);
  }
  await Promise.all(pool);
}
特点:
- 动态维持最大并发数,无需预分片。
- 内存占用较高(需维护池对象)。
# 四、生成器函数 + 异步迭代
核心思想:利用生成器函数逐个产出任务,通过异步迭代控制执行节奏。
 代码示例(参考 [7]):
async function* taskGenerator(tasks) {
  for (const task of tasks) yield task();
}
async function runWithLimit(tasks, max) {
  const iterator = taskGenerator(tasks);
  const workers = Array(max).fill(iterator).map(async (it) => {
    for await (const result of it) console.log(result);
  });
  await Promise.all(workers);
}
特点:
- 代码可读性强,适合复杂任务流。
- 需要 ES2018 的 for await...of支持。
# 五、第三方库(推荐)
常用库:
- p-limit:轻量级库,支持链式调用([5])。
- tiny-async-pool:极简实现,核心代码仅 10 行([5])。
使用示例:
import pLimit from 'p-limit';
const limit = pLimit(2); // 最大并发数 2
const tasks = [/* 异步函数数组 */];
const results = await Promise.all(tasks.map(task => limit(task)));
# 关键对比
| 方法 | 动态添加任务 | 预加载任务 | 实现复杂度 | 适用场景 | 
|---|---|---|---|---|
| 调度器类 | ✔️ | ❌ | 中等 | 需要灵活控制的长期任务 | 
| 递归分片 | ❌ | ✔️ | 简单 | 固定任务数组 | 
| Promise.race 动态池 | ✔️ | ✔️ | 中等 | 高实时性任务 | 
| 生成器迭代 | ✔️ | ✔️ | 较高 | 复杂任务流 | 
| 第三方库 | ✔️ | ✔️ | 低 | 生产环境 | 
# 选择建议
- 简单场景:使用 递归分片或第三方库(如p-limit)。
- 动态任务:选择 调度器类或Promise.race 动态池。
- 错误处理:注意在任务失败时是否需要终止整个流程(Promise.all会整体失败,Promise.allSettled更灵活,参考 [1])。
以上方案均可根据实际需求调整错误处理、任务优先级等逻辑。
编辑  (opens new window)
  上次更新: 2025/10/09, 07:47:31
