Task Queue
特性
实现一个任务队列,解藕业务代码,支持如下特性
- 类型增强
- 错误重试
- 异常处理
- 任务并发
- 降频节流
- 任务隔离
设计与思考
FIFO
最简单FIFO 设计,分为“读写”两端,从队列中取出任务执行,如果队列空则开启利用 “事件循环” 机制轮询检查任务队列。
使用requestAnimationFrame 来做轮询的操。Why not requestIdleCallback;
const checkQueue = () => {
requestAnimationFrame(() => {
// 'will be called before the next repaint.'
checkQueue()
})
}
通过 push 向队列中添加任务
queueRef.current.push(newTask)
从队列中取出任务执行,
const newTask = queueRef.current.shift()
并发
通过异步实现并发执行来提高效率,并发需要考虑到 竞态条件,任务追踪,错误重试,等问题。
为了解决竞态条件带来的脏数据问题,不直接操作原数据,通过设置并发数,从任务队列中取出任务[less than 当前可并发个数],随后放入并发队列中。 当任务执行失败时,执行重试机制,任务执行成功或重试异常时将任务从并发队列中移除,适时抛出异常。
源码
// @ts-nocheck ignore mdx ts check
import {useCallback, useEffect, useRef} from "react";
requestIdleCallback
type TaskStatus = "pending" | "running" | "completed" | "failed";
interface Task<T = any> {
id: string;
invoke: () => Promise<T> | T;
status: TaskStatus;
retries?: number;
createdAt: number;
}
interface TaskQueueConfig {
concurrency?: number;
retry?: number;
retryInterval?: number;
}
const DEFAULT_CONFIG: Required<TaskQueueConfig> = {
concurrency: 3,
retry: 2,
retryInterval: 1000,
};
function useTask(config?: TaskQueueConfig) {
const queueRef = useRef<Task[]>([]);
const activeTasksRef = useRef<Set<string>>(new Set());
const isMountedRef = useRef(true);
const configRef = useRef({ ...DEFAULT_CONFIG, ...config });
// 任务处理器
const processTask = useCallback(async () => {
if (
!isMountedRef.current ||
activeTasksRef.current.size >= configRef.current.concurrency ||
queueRef.current.length === 0
) {
return;
}
const availableSlots =
configRef.current.concurrency - activeTasksRef.current.size;
const tasksToProcess = queueRef.current.splice(0, availableSlots);
await Promise.allSettled(
tasksToProcess.map(async (task) => {
try {
task.status = "running";
activeTasksRef.current.add(task.id);
await task.invoke();
task.status = "completed";
} catch (error) {
if ((task.retries || 0) < configRef.current.retry) {
task.retries = (task.retries || 0) + 1;
task.status = "pending";
setTimeout(() => {
queueRef.current.push(task);
processTask();
}, configRef.current.retryInterval);
} else {
task.status = "failed";
console.error(
`Task ${task.id} failed after ${task.retries} retries`,
error,
);
}
} finally {
activeTasksRef.current.delete(task.id);
processTask(); // 处理后续任务
}
}),
);
}, []);
// 队列监听器
const startQueue = useCallback(() => {
const checkQueue = () => {
if (!isMountedRef.current) return;
requestAnimationFrame(() => {
processTask().finally(() => {
if (queueRef.current.length > 0) {
checkQueue();
} else {
setTimeout(checkQueue, 100); // 队列空时降低轮询频率
}
});
});
};
checkQueue();
}, [processTask]);
// 添加任务
const enqueue = useCallback(
<T>(
task: Omit<Task<T>, "status" | "createdAt" | "retries">,
): Promise<T> => {
return new Promise((resolve, reject) => {
const newTask: Task<T> = {
...task,
status: "pending",
createdAt: Date.now(),
retries: 0,
invoke: async () => {
try {
const result = await task.invoke();
resolve(result);
return result;
} catch (error) {
reject(error);
throw error;
}
},
};
queueRef.current.push(newTask);
processTask();
});
},
[processTask],
);
// 初始化
useEffect(() => {
startQueue();
return () => {
isMountedRef.current = false;
};
}, [startQueue]);
return {
enqueue,
getQueueSize: () => queueRef.current.length,
getActiveCount: () => activeTasksRef.current.size,
};
}
export default useTask;