Implement a task queue with controlled concurrency (success/error callbacks, custom executors)
A queue with a configurable max concurrency: `add(task)` returns a promise; running count is tracked; when one completes, the next pending task starts. Supports success/error callbacks, custom executors, and `drain` / `idle` events. Core data structure: a FIFO queue + a `running` counter + a `dequeue()` step on settle.
A concurrency-limited task queue is the canonical async control problem. Implementations differ in API; the engine is small.
The core engine
class TaskQueue {
constructor({ concurrency = 1 } = {}) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
add(task) { // task: () => Promise<any>
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this._dequeue();
});
}
_dequeue() {
while (this.running < this.concurrency && this.queue.length) {
const { task, resolve, reject } = this.queue.shift();
this.running++;
Promise.resolve()
.then(task)
.then((value) => { resolve(value); }, (err) => { reject(err); })
.finally(() => {
this.running--;
this._dequeue();
});
}
}
get pending() { return this.queue.length; }
get active() { return this.running; }
}Usage
const q = new TaskQueue({ concurrency: 3 });
const results = await Promise.all(
urls.map((u) => q.add(() => fetch(u).then((r) => r.json())))
);Extensions
Success / error callbacks per add
q.add(task, { onSuccess: (v) => log("ok", v), onError: (e) => log("err", e) });Custom executor
Allow injecting a wrapper (retry, timeout, telemetry):
new TaskQueue({ concurrency: 3, executor: (task) => withRetry(task, 3) });Drain / idle event
onIdle() { // resolves when queue + running = 0
if (this.running === 0 && this.queue.length === 0) return Promise.resolve();
return new Promise((res) => this._idleResolvers.push(res));
}
// inside _dequeue, after running--/queue empty: drain _idleResolversPriority
Use a heap or sorted array instead of FIFO; add(task, { priority }).
Cancellation
Track AbortControllers per task; abort rejects pending or signals running tasks.
Why this shape
- The queue is the bookkeeping, not the task runner — tasks are arbitrary async functions.
Promise.resolve().then(task)ensures synchronous throws are caught.finallydecrements the counter then re-dequeues — keeps the running count exactly at concurrency.- The recursive
_dequeueafter each settle keeps the engine simple — no separate tick loop.
Subtle gotchas
- Sync throws in
task— wrap withPromise.resolve().then(task)so they become rejections. - Memory leak — clear settled handlers if you keep references.
- Backpressure — if callers keep adding faster than you drain, queue grows unboundedly. Optionally cap
queue.lengthand reject. - Order — preserved by default (FIFO); not guaranteed by completion (concurrency > 1).
Interview framing
"A concurrency-limited queue keeps a running counter and a FIFO of pending tasks. add returns a promise that resolves when the task settles. After each settle, decrement and re-dequeue — that recursion is the engine. Wrap the task in Promise.resolve().then so sync throws become rejections. Extensions are orthogonal: custom executor for retries/timeouts, priority for non-FIFO, an idle promise for drain, AbortController for cancellation. Backpressure matters in production — if callers add faster than we drain, cap or reject."
Follow-up questions
- •Why use Promise.resolve().then(task) instead of calling task() directly?
- •How would you add cancellation?
- •How would you implement priority?
- •How does this compare to p-limit / p-queue?
Common mistakes
- •Calling task() synchronously and missing thrown errors.
- •Decrementing running before the task settles.
- •Forgetting to re-dequeue after a finally → queue stalls.
- •Unbounded queue growth — no backpressure.
Performance considerations
- •Engine cost is O(1) per task. Pick a real priority queue if priority is hot. Avoid `array.shift()` in hot paths — use an index-based dequeue.
Edge cases
- •Empty queue, all tasks finished — onIdle resolves immediately.
- •Sync throws inside a task.
- •Tasks that never settle — running counter never decrements (add timeouts).
- •Adding while draining.
Real-world examples
- •p-limit, p-queue, async.queue.
- •Image upload concurrency limiter, request fan-out caps, web worker pool.