Skip to content

A simple and efficient framework-agnostic queue system for Node.js applications

License

Notifications You must be signed in to change notification settings

boringnode/queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

77 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

@boringnode/queue

typescript-image gh-workflow-image npm-image npm-download-image license-image

A simple and efficient queue system for Node.js applications. Built for simplicity and ease of use, @boringnode/queue allows you to dispatch background jobs and process them asynchronously with support for multiple queue adapters.

Installation

npm install @boringnode/queue

Features

  • Multiple Queue Adapters: Redis, Knex (PostgreSQL, MySQL, SQLite), and Sync
  • Type-Safe Jobs: TypeScript classes with typed payloads
  • Delayed Jobs: Schedule jobs to run after a delay
  • Priority Queues: Process high-priority jobs first
  • Bulk Dispatch: Efficiently dispatch thousands of jobs at once
  • Job Grouping: Organize related jobs for monitoring
  • Retry with Backoff: Exponential, linear, or fixed backoff strategies
  • Job Timeout: Fail or retry jobs that exceed a time limit
  • Job History: Retain completed/failed jobs for debugging
  • Scheduled Jobs: Cron or interval-based recurring jobs
  • Auto-Discovery: Automatically register jobs from specified locations

Quick Start

1. Define a Job

import { Job } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types'

interface SendEmailPayload {
  to: string
}

export default class SendEmailJob extends Job<SendEmailPayload> {
  static options: JobOptions = {
    queue: 'email',
  }

  async execute(): Promise<void> {
    console.log(`Sending email to: ${this.payload.to}`)
  }
}

Note

The job name defaults to the class name (SendEmailJob). You can override it with name: 'CustomName' in options.

Warning

If you minify your code in production, class names may be mangled. Always specify name explicitly in your job options.

2. Configure the Queue Manager

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost', port: 6379 }),
  },
  locations: ['./app/jobs/**/*.ts'],
})

3. Dispatch Jobs

// Simple dispatch
await SendEmailJob.dispatch({ to: '[email protected]' })

// With options
await SendEmailJob.dispatch({ to: '[email protected]' })
  .toQueue('high-priority')
  .priority(1)
  .in('5m')

4. Start a Worker

import { Worker } from '@boringnode/queue'

const worker = new Worker(config)
await worker.start(['default', 'email'])

Bulk Dispatch

Efficiently dispatch thousands of jobs in a single batch operation:

const { jobIds } = await SendEmailJob.dispatchMany([
  { to: '[email protected]' },
  { to: '[email protected]' },
  { to: '[email protected]' },
])
  .group('newsletter-jan-2025')
  .toQueue('emails')
  .priority(3)

console.log(`Dispatched ${jobIds.length} jobs`)

This uses Redis MULTI/EXEC or SQL batch insert for optimal performance.

Job Grouping

Organize related jobs together for monitoring and filtering:

// Group newsletter jobs
await SendEmailJob.dispatch({ to: '[email protected]' })
  .group('newsletter-jan-2025')

// Group with bulk dispatch
await SendEmailJob.dispatchMany(recipients)
  .group('newsletter-jan-2025')

The groupId is stored with job data and accessible via job.data.groupId.

Job History & Retention

Keep completed and failed jobs for debugging:

export default class ImportantJob extends Job<Payload> {
  static options: JobOptions = {
    // Keep last 1000 completed jobs
    removeOnComplete: { count: 1000 },

    // Keep failed jobs for 7 days
    removeOnFail: { age: '7d' },
  }
}
Retention options
Value Behavior
true (default) Remove immediately
false Keep forever
{ count: n } Keep last n jobs
{ age: '7d' } Keep for duration
{ count: 100, age: '1d' } Both limits apply

Query job history:

const job = await adapter.getJob('job-id', 'queue-name')
console.log(job.status)      // 'completed' | 'failed'
console.log(job.finishedAt)  // timestamp
console.log(job.error)       // error message (if failed)

Adapters

Redis (recommended for production)

import { redis } from '@boringnode/queue/drivers/redis_adapter'

// With options
const adapter = redis({ host: 'localhost', port: 6379 })

// With existing ioredis instance
import { Redis } from 'ioredis'
const connection = new Redis({ host: 'localhost' })
const adapter = redis(connection)

Knex (PostgreSQL, MySQL, SQLite)

import { knex } from '@boringnode/queue/drivers/knex_adapter'

const adapter = knex({
  client: 'pg',
  connection: { host: 'localhost', database: 'myapp' },
})
More Knex examples
// With existing Knex instance
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const adapter = knex(connection)

// Custom table name
const adapter = knex(config, 'custom_jobs_table')

The adapter automatically creates tables on first use.

Sync (for testing)

import { sync } from '@boringnode/queue/drivers/sync_adapter'

const adapter = sync() // Jobs execute immediately

Job Options

export default class MyJob extends Job<Payload> {
  static options: JobOptions = {
    queue: 'email',       // Queue name (default: 'default')
    priority: 1,          // Lower = higher priority (default: 5)
    maxRetries: 3,        // Retry attempts before failing
    timeout: '30s',       // Max execution time
    failOnTimeout: true,  // Fail permanently on timeout (default: retry)
    removeOnComplete: { count: 100 },  // Keep last 100 completed
    removeOnFail: { age: '7d' },       // Keep failed for 7 days
  }
}

Delayed Jobs

await SendEmailJob.dispatch(payload).in('30s')  // 30 seconds
await SendEmailJob.dispatch(payload).in('5m')   // 5 minutes
await SendEmailJob.dispatch(payload).in('2h')   // 2 hours
await SendEmailJob.dispatch(payload).in('1d')   // 1 day

Retry & Backoff

import { exponentialBackoff } from '@boringnode/queue'

export default class ReliableJob extends Job<Payload> {
  static options: JobOptions = {
    maxRetries: 5,
    retry: {
      backoff: () => exponentialBackoff({
        baseDelay: '1s',
        maxDelay: '1m',
        multiplier: 2,
        jitter: true,
      }),
    },
  }
}
Available strategies
import { exponentialBackoff, linearBackoff, fixedBackoff } from '@boringnode/queue'

// Exponential: 1s, 2s, 4s, 8s...
exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2 })

// Linear: 1s, 2s, 3s, 4s...
linearBackoff({ baseDelay: '1s', maxDelay: '30s', multiplier: 1 })

// Fixed: 5s, 5s, 5s...
fixedBackoff({ baseDelay: '5s', jitter: true })

Job Timeout

export default class LongRunningJob extends Job<Payload> {
  static options: JobOptions = {
    timeout: '30s',
    failOnTimeout: false, // Will retry (default)
  }

  async execute(): Promise<void> {
    for (const item of this.payload.items) {
      // Check abort signal for graceful timeout handling
      if (this.signal?.aborted) {
        throw new Error('Job timed out')
      }
      await this.processItem(item)
    }
  }
}

Job Context

Access execution metadata via this.context:

async execute(): Promise<void> {
  console.log(this.context.jobId)       // Unique job ID
  console.log(this.context.attempt)     // 1, 2, 3...
  console.log(this.context.queue)       // Queue name
  console.log(this.context.priority)    // Priority value
  console.log(this.context.acquiredAt)  // When acquired
  console.log(this.context.stalledCount) // Stall recoveries
}

Scheduled Jobs

Run jobs on a recurring basis:

// Every 10 seconds
await MetricsJob.schedule({ endpoint: '/health' })
  .every('10s')

// Cron schedule
await CleanupJob.schedule({ days: 30 })
  .id('daily-cleanup')
  .cron('0 0 * * *')  // Midnight daily
  .timezone('Europe/Paris')
Schedule management
import { Schedule } from '@boringnode/queue'

// Find and manage
const schedule = await Schedule.find('daily-cleanup')
await schedule.pause()
await schedule.resume()
await schedule.trigger()  // Run now
await schedule.delete()

// List schedules
const all = await Schedule.list()
const active = await Schedule.list({ status: 'active' })

Schedule options:

Method Description
.id(string) Unique identifier
.every(duration) Fixed interval ('5s', '1m', '1h')
.cron(expression) Cron schedule
.timezone(tz) Timezone (default: 'UTC')
.from(date) Start boundary
.to(date) End boundary
.limit(n) Maximum runs

Dependency Injection

Integrate with IoC containers:

await QueueManager.init({
  // ...
  jobFactory: async (JobClass) => {
    return app.container.make(JobClass)
  },
})
Example with injected services
export default class SendEmailJob extends Job<SendEmailPayload> {
  constructor(
    private mailer: MailerService,
    private logger: Logger
  ) {
    super()
  }

  async execute(): Promise<void> {
    this.logger.info(`Sending email to ${this.payload.to}`)
    await this.mailer.send(this.payload)
  }
}

Worker Configuration

const config = {
  worker: {
    concurrency: 5,        // Parallel jobs
    idleDelay: '2s',       // Poll interval when idle
    timeout: '1m',         // Default job timeout
    stalledThreshold: '30s', // When to consider job stalled
    stalledInterval: '30s',  // How often to check
    maxStalledCount: 1,      // Max recoveries before failing
    gracefulShutdown: true,  // Wait for jobs on SIGTERM
  },
}

Logging

import { pino } from 'pino'

await QueueManager.init({
  // ...
  logger: pino(),
})

Benchmarks

Performance comparison with BullMQ (5ms simulated work per job):

Jobs Concurrency @boringnode/queue BullMQ Diff
1000 5 1096ms 1116ms 1.8% faster
1000 10 565ms 579ms 2.4% faster
100K 10 56.2s 57.5s 2.1% faster
100K 20 29.1s 29.6s 1.7% faster
npm run benchmark -- --realistic

About

A simple and efficient framework-agnostic queue system for Node.js applications

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Packages

No packages published