Skip to content
📚 5 min read

Batch Throttling Examples ​

Learn how to implement batch processing with throttling for better performance and resource management.

Basic Usage ​

typescript
// Simple batch processor
async function processBatch<T, R>(
  items: T[],
  batchSize: number,
  processor: (batch: T[]) => Promise<R[]>
): Promise<R[]> {
  const results: R[] = [];

  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    const batchResults = await processor(batch);
    results.push(...batchResults);
  }

  return results;
}

// Batch processor with delay
async function processBatchWithDelay<T, R>(
  items: T[],
  batchSize: number,
  delayMs: number,
  processor: (batch: T[]) => Promise<R[]>
): Promise<R[]> {
  const results: R[] = [];

  for (let i = 0; i < items.length; i += batchSize) {
    if (i > 0) {
      await new Promise((resolve) => setTimeout(resolve, delayMs));
    }

    const batch = items.slice(i, i + batchSize);
    const batchResults = await processor(batch);
    results.push(...batchResults);
  }

  return results;
}

Advanced Patterns ​

Throttled Batch Processor ​

typescript
interface ThrottledBatchOptions<T, R> {
  batchSize: number;
  minDelay: number;
  maxDelay: number;
  processor: (batch: T[]) => Promise<R[]>;
  onBatchComplete?: (results: R[], timing: number) => void;
  onBatchError?: (error: Error, batch: T[]) => void;
}

class ThrottledBatchProcessor<T, R> {
  private queue: T[] = [];
  private processing = false;
  private currentDelay: number;
  private lastProcessTime: number = 0;
  private processingTimes: number[] = [];

  constructor(private options: ThrottledBatchOptions<T, R>) {
    this.currentDelay = options.minDelay;
  }

  async add(items: T[]): Promise<void> {
    this.queue.push(...items);

    if (!this.processing) {
      this.processing = true;
      await this.processQueue();
    }
  }

  private async processQueue(): Promise<void> {
    while (this.queue.length > 0) {
      const timeSinceLastProcess = Date.now() - this.lastProcessTime;
      const timeToWait = Math.max(0, this.currentDelay - timeSinceLastProcess);

      if (timeToWait > 0) {
        await new Promise((resolve) => setTimeout(resolve, timeToWait));
      }

      const batch = this.queue.splice(0, this.options.batchSize);
      const startTime = Date.now();

      try {
        const results = await this.options.processor(batch);
        const processingTime = Date.now() - startTime;

        this.updateProcessingMetrics(processingTime);

        if (this.options.onBatchComplete) {
          this.options.onBatchComplete(results, processingTime);
        }
      } catch (error) {
        if (this.options.onBatchError) {
          this.options.onBatchError(error as Error, batch);
        }
        // Re-queue failed items
        this.queue.unshift(...batch);
        // Increase delay on error
        this.increaseDelay();
      }

      this.lastProcessTime = Date.now();
    }

    this.processing = false;
  }

  private updateProcessingMetrics(processingTime: number): void {
    this.processingTimes.push(processingTime);
    if (this.processingTimes.length > 10) {
      this.processingTimes.shift();
    }

    const avgProcessingTime =
      this.processingTimes.reduce((a, b) => a + b) /
      this.processingTimes.length;
    this.adjustDelay(avgProcessingTime);
  }

  private adjustDelay(avgProcessingTime: number): void {
    if (avgProcessingTime > this.currentDelay) {
      this.increaseDelay();
    } else if (avgProcessingTime < this.currentDelay / 2) {
      this.decreaseDelay();
    }
  }

  private increaseDelay(): void {
    this.currentDelay = Math.min(
      this.options.maxDelay,
      this.currentDelay * 1.5
    );
  }

  private decreaseDelay(): void {
    this.currentDelay = Math.max(
      this.options.minDelay,
      this.currentDelay * 0.8
    );
  }

  getCurrentDelay(): number {
    return this.currentDelay;
  }

  getQueueLength(): number {
    return this.queue.length;
  }
}

Concurrent Batch Processor ​

typescript
interface ConcurrentBatchOptions<T, R> {
  batchSize: number;
  concurrency: number;
  processor: (batch: T[]) => Promise<R[]>;
  onBatchComplete?: (results: R[]) => void;
  onBatchError?: (error: Error, batch: T[]) => void;
}

class ConcurrentBatchProcessor<T, R> {
  private queue: T[] = [];
  private activeProcessors = 0;
  private processing = false;

  constructor(private options: ConcurrentBatchOptions<T, R>) {}

  async add(items: T[]): Promise<void> {
    this.queue.push(...items);

    if (!this.processing) {
      this.processing = true;
      await this.processQueue();
    }
  }

  private async processQueue(): Promise<void> {
    while (this.queue.length > 0) {
      if (this.activeProcessors >= this.options.concurrency) {
        await new Promise((resolve) => setTimeout(resolve, 100));
        continue;
      }

      const batch = this.queue.splice(0, this.options.batchSize);
      this.activeProcessors++;

      this.processBatch(batch).finally(() => {
        this.activeProcessors--;
      });
    }

    // Wait for all processors to complete
    while (this.activeProcessors > 0) {
      await new Promise((resolve) => setTimeout(resolve, 100));
    }

    this.processing = false;
  }

  private async processBatch(batch: T[]): Promise<void> {
    try {
      const results = await this.options.processor(batch);

      if (this.options.onBatchComplete) {
        this.options.onBatchComplete(results);
      }
    } catch (error) {
      if (this.options.onBatchError) {
        this.options.onBatchError(error as Error, batch);
      }
      // Re-queue failed items
      this.queue.unshift(...batch);
    }
  }

  getQueueLength(): number {
    return this.queue.length;
  }

  getActiveProcessors(): number {
    return this.activeProcessors;
  }
}

Priority Batch Processor ​

typescript
interface PriorityBatchItem<T> {
  data: T;
  priority: number;
}

interface PriorityBatchOptions<T, R> {
  batchSize: number;
  processor: (batch: T[]) => Promise<R[]>;
  maxDelay: number;
  priorityLevels: number;
}

class PriorityBatchProcessor<T, R> {
  private queues: T[][] = [];
  private processing = false;
  private lastProcessTime: number = 0;

  constructor(private options: PriorityBatchOptions<T, R>) {
    this.queues = Array.from({ length: options.priorityLevels }, () => []);
  }

  add(item: PriorityBatchItem<T>): void {
    this.queues[item.priority].push(item.data);

    if (!this.processing) {
      this.processing = true;
      this.processQueue();
    }
  }

  private async processQueue(): Promise<void> {
    while (this.hasItems()) {
      const timeSinceLastProcess = Date.now() - this.lastProcessTime;
      const timeToWait = Math.max(
        0,
        this.options.maxDelay - timeSinceLastProcess
      );

      if (timeToWait > 0) {
        await new Promise((resolve) => setTimeout(resolve, timeToWait));
      }

      const batch = this.collectNextBatch();

      try {
        await this.options.processor(batch);
      } catch (error) {
        // Re-queue failed items at the same priority level
        const priority = this.findHighestPriorityWithItems();
        this.queues[priority].unshift(...batch);
      }

      this.lastProcessTime = Date.now();
    }

    this.processing = false;
  }

  private hasItems(): boolean {
    return this.queues.some((queue) => queue.length > 0);
  }

  private findHighestPriorityWithItems(): number {
    for (let i = this.queues.length - 1; i >= 0; i--) {
      if (this.queues[i].length > 0) {
        return i;
      }
    }
    return 0;
  }

  private collectNextBatch(): T[] {
    const batch: T[] = [];
    let remaining = this.options.batchSize;

    // Start from highest priority queue
    for (let i = this.queues.length - 1; i >= 0 && remaining > 0; i--) {
      const queue = this.queues[i];
      const itemsToTake = Math.min(remaining, queue.length);

      if (itemsToTake > 0) {
        batch.push(...queue.splice(0, itemsToTake));
        remaining -= itemsToTake;
      }
    }

    return batch;
  }

  getQueueLengths(): number[] {
    return this.queues.map((queue) => queue.length);
  }

  getTotalItems(): number {
    return this.queues.reduce((sum, queue) => sum + queue.length, 0);
  }
}