Home
Utils

Task Runner

11/14/2025, 3:30:43 AM modified by Marvin

A utility to run concurrent tasks with retry support.

Installation

Loading...

Preview

Nothing to show.
type Task<T> = () => Promise<T>;export interface ConcurrentRunnerOptions {  /**   * 最大并发数,默认无限制。   */  concurrency?: number;  /**   * 每个任务允许的最大重试次数(不含首次执行),默认 3 次。   */  maxRetries?: number;  /**   * 每次重试之间的等待时长,单位毫秒,默认 3000ms。   */  retryDelay?: number;}export class ConcurrentRunner {  private readonly concurrency: number;  private readonly maxRetries: number;  private readonly retryDelay: number;  constructor(options: ConcurrentRunnerOptions = {}) {    const { concurrency = Number.POSITIVE_INFINITY, maxRetries = 3, retryDelay = 3_000 } = options;    if (!Number.isFinite(concurrency) || concurrency <= 0) {      throw new Error('concurrency must be a positive finite number');    }    this.concurrency = Math.floor(concurrency);    this.maxRetries = Math.max(0, Math.floor(maxRetries));    this.retryDelay = Math.max(0, Math.floor(retryDelay));  }  async run<T>(tasks: Task<T>[]): Promise<T[]> {    if (!Array.isArray(tasks)) {      throw new Error('tasks must be an array of promise-returning functions');    }    const total = tasks.length;    if (total === 0) {      return [];    }    const results: T[] = new Array(total);    let cursor = 0;    const worker = async () => {      while (true) {        const index = cursor++;        if (index >= total) {          return;        }        const task = tasks[index];        if (typeof task !== 'function') {          throw new Error(`task at index ${index} is not a function`);        }        results[index] = await this.executeWithRetry(task);      }    };    const workerCount = Math.min(total, this.concurrency);    await Promise.all(Array.from({ length: workerCount }, () => worker()));    return results;  }  private async executeWithRetry<T>(task: Task<T>): Promise<T> {    let attempt = 0;    let lastError: unknown;    while (attempt <= this.maxRetries) {      try {        return await task();      } catch (error) {        lastError = error;        attempt += 1;        if (attempt > this.maxRetries) {          break;        }        if (this.retryDelay > 0) {          await new Promise<void>((resolve) => {            setTimeout(resolve, this.retryDelay);          });        }      }    }    throw lastError;  }}
import { describe, expect, test, vi } from 'vitest';import { ConcurrentRunner } from './index';describe('ConcurrentRunner', () => {  test('执行多个任务并保持结果顺序', async () => {    const runner = new ConcurrentRunner({ concurrency: 2, retryDelay: 10 });    const tasks = [vi.fn(async () => 'task 1'), vi.fn(async () => 'task 2'), vi.fn(async () => 'task 3')];    const results = await runner.run(tasks);    console.log('results', results);    expect(results).toEqual(['task 1', 'task 2', 'task 3']);    tasks.forEach((task) => {      expect(task).toHaveBeenCalledTimes(1);    });  });  test('任务失败后自动重试直至成功', async () => {    const runner = new ConcurrentRunner({ concurrency: 1, retryDelay: 10 });    let attempt = 0;    const task = vi.fn(async () => {      attempt += 1;      if (attempt < 3) {        throw new Error('temporary error');      }      return 'success';    });    const results = await runner.run([task]);    expect(results).toEqual(['success']);    expect(task).toHaveBeenCalledTimes(3);  });  test('超过最大重试次数后抛出错误', async () => {    const runner = new ConcurrentRunner({ concurrency: 1, retryDelay: 10 });    const error = new Error('persistent error');    const task = vi.fn(async () => {      throw error;    });    await expect(runner.run([task])).rejects.toBe(error);    expect(task).toHaveBeenCalledTimes(4);  });});

Git Commit History(1 commits)

feat: build

Marvin
11月14日 03:30
e50053c2