A simple and efficient queue system for Node.js applications. Built for simplicity and ease of use, @boringnode/queue allows you to dispatch background jobs and process them asynchronously with support for multiple queue adapters.
npm install @boringnode/queue- Multiple Queue Adapters: Redis, Knex (PostgreSQL, MySQL, SQLite), and Sync
- Type-Safe Jobs: TypeScript classes with typed payloads
- Delayed Jobs: Schedule jobs to run after a delay
- Priority Queues: Process high-priority jobs first
- Bulk Dispatch: Efficiently dispatch thousands of jobs at once
- Job Grouping: Organize related jobs for monitoring
- Retry with Backoff: Exponential, linear, or fixed backoff strategies
- Job Timeout: Fail or retry jobs that exceed a time limit
- Job History: Retain completed/failed jobs for debugging
- Scheduled Jobs: Cron or interval-based recurring jobs
- Auto-Discovery: Automatically register jobs from specified locations
import { Job } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types'
interface SendEmailPayload {
to: string
}
export default class SendEmailJob extends Job<SendEmailPayload> {
static options: JobOptions = {
queue: 'email',
}
async execute(): Promise<void> {
console.log(`Sending email to: ${this.payload.to}`)
}
}Note
The job name defaults to the class name (SendEmailJob). You can override it with name: 'CustomName' in options.
Warning
If you minify your code in production, class names may be mangled. Always specify name explicitly in your job options.
import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'
await QueueManager.init({
default: 'redis',
adapters: {
redis: redis({ host: 'localhost', port: 6379 }),
},
locations: ['./app/jobs/**/*.ts'],
})// Simple dispatch
await SendEmailJob.dispatch({ to: '[email protected]' })
// With options
await SendEmailJob.dispatch({ to: '[email protected]' })
.toQueue('high-priority')
.priority(1)
.in('5m')import { Worker } from '@boringnode/queue'
const worker = new Worker(config)
await worker.start(['default', 'email'])Efficiently dispatch thousands of jobs in a single batch operation:
const { jobIds } = await SendEmailJob.dispatchMany([
{ to: '[email protected]' },
{ to: '[email protected]' },
{ to: '[email protected]' },
])
.group('newsletter-jan-2025')
.toQueue('emails')
.priority(3)
console.log(`Dispatched ${jobIds.length} jobs`)This uses Redis MULTI/EXEC or SQL batch insert for optimal performance.
Organize related jobs together for monitoring and filtering:
// Group newsletter jobs
await SendEmailJob.dispatch({ to: '[email protected]' })
.group('newsletter-jan-2025')
// Group with bulk dispatch
await SendEmailJob.dispatchMany(recipients)
.group('newsletter-jan-2025')The groupId is stored with job data and accessible via job.data.groupId.
Keep completed and failed jobs for debugging:
export default class ImportantJob extends Job<Payload> {
static options: JobOptions = {
// Keep last 1000 completed jobs
removeOnComplete: { count: 1000 },
// Keep failed jobs for 7 days
removeOnFail: { age: '7d' },
}
}Retention options
| Value | Behavior |
|---|---|
true (default) |
Remove immediately |
false |
Keep forever |
{ count: n } |
Keep last n jobs |
{ age: '7d' } |
Keep for duration |
{ count: 100, age: '1d' } |
Both limits apply |
Query job history:
const job = await adapter.getJob('job-id', 'queue-name')
console.log(job.status) // 'completed' | 'failed'
console.log(job.finishedAt) // timestamp
console.log(job.error) // error message (if failed)import { redis } from '@boringnode/queue/drivers/redis_adapter'
// With options
const adapter = redis({ host: 'localhost', port: 6379 })
// With existing ioredis instance
import { Redis } from 'ioredis'
const connection = new Redis({ host: 'localhost' })
const adapter = redis(connection)import { knex } from '@boringnode/queue/drivers/knex_adapter'
const adapter = knex({
client: 'pg',
connection: { host: 'localhost', database: 'myapp' },
})More Knex examples
// With existing Knex instance
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const adapter = knex(connection)
// Custom table name
const adapter = knex(config, 'custom_jobs_table')The adapter automatically creates tables on first use.
import { sync } from '@boringnode/queue/drivers/sync_adapter'
const adapter = sync() // Jobs execute immediatelyexport default class MyJob extends Job<Payload> {
static options: JobOptions = {
queue: 'email', // Queue name (default: 'default')
priority: 1, // Lower = higher priority (default: 5)
maxRetries: 3, // Retry attempts before failing
timeout: '30s', // Max execution time
failOnTimeout: true, // Fail permanently on timeout (default: retry)
removeOnComplete: { count: 100 }, // Keep last 100 completed
removeOnFail: { age: '7d' }, // Keep failed for 7 days
}
}await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
await SendEmailJob.dispatch(payload).in('1d') // 1 dayimport { exponentialBackoff } from '@boringnode/queue'
export default class ReliableJob extends Job<Payload> {
static options: JobOptions = {
maxRetries: 5,
retry: {
backoff: () => exponentialBackoff({
baseDelay: '1s',
maxDelay: '1m',
multiplier: 2,
jitter: true,
}),
},
}
}Available strategies
import { exponentialBackoff, linearBackoff, fixedBackoff } from '@boringnode/queue'
// Exponential: 1s, 2s, 4s, 8s...
exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2 })
// Linear: 1s, 2s, 3s, 4s...
linearBackoff({ baseDelay: '1s', maxDelay: '30s', multiplier: 1 })
// Fixed: 5s, 5s, 5s...
fixedBackoff({ baseDelay: '5s', jitter: true })export default class LongRunningJob extends Job<Payload> {
static options: JobOptions = {
timeout: '30s',
failOnTimeout: false, // Will retry (default)
}
async execute(): Promise<void> {
for (const item of this.payload.items) {
// Check abort signal for graceful timeout handling
if (this.signal?.aborted) {
throw new Error('Job timed out')
}
await this.processItem(item)
}
}
}Access execution metadata via this.context:
async execute(): Promise<void> {
console.log(this.context.jobId) // Unique job ID
console.log(this.context.attempt) // 1, 2, 3...
console.log(this.context.queue) // Queue name
console.log(this.context.priority) // Priority value
console.log(this.context.acquiredAt) // When acquired
console.log(this.context.stalledCount) // Stall recoveries
}Run jobs on a recurring basis:
// Every 10 seconds
await MetricsJob.schedule({ endpoint: '/health' })
.every('10s')
// Cron schedule
await CleanupJob.schedule({ days: 30 })
.id('daily-cleanup')
.cron('0 0 * * *') // Midnight daily
.timezone('Europe/Paris')Schedule management
import { Schedule } from '@boringnode/queue'
// Find and manage
const schedule = await Schedule.find('daily-cleanup')
await schedule.pause()
await schedule.resume()
await schedule.trigger() // Run now
await schedule.delete()
// List schedules
const all = await Schedule.list()
const active = await Schedule.list({ status: 'active' })Schedule options:
| Method | Description |
|---|---|
.id(string) |
Unique identifier |
.every(duration) |
Fixed interval ('5s', '1m', '1h') |
.cron(expression) |
Cron schedule |
.timezone(tz) |
Timezone (default: 'UTC') |
.from(date) |
Start boundary |
.to(date) |
End boundary |
.limit(n) |
Maximum runs |
Integrate with IoC containers:
await QueueManager.init({
// ...
jobFactory: async (JobClass) => {
return app.container.make(JobClass)
},
})Example with injected services
export default class SendEmailJob extends Job<SendEmailPayload> {
constructor(
private mailer: MailerService,
private logger: Logger
) {
super()
}
async execute(): Promise<void> {
this.logger.info(`Sending email to ${this.payload.to}`)
await this.mailer.send(this.payload)
}
}const config = {
worker: {
concurrency: 5, // Parallel jobs
idleDelay: '2s', // Poll interval when idle
timeout: '1m', // Default job timeout
stalledThreshold: '30s', // When to consider job stalled
stalledInterval: '30s', // How often to check
maxStalledCount: 1, // Max recoveries before failing
gracefulShutdown: true, // Wait for jobs on SIGTERM
},
}import { pino } from 'pino'
await QueueManager.init({
// ...
logger: pino(),
})Performance comparison with BullMQ (5ms simulated work per job):
| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff |
|---|---|---|---|---|
| 1000 | 5 | 1096ms | 1116ms | 1.8% faster |
| 1000 | 10 | 565ms | 579ms | 2.4% faster |
| 100K | 10 | 56.2s | 57.5s | 2.1% faster |
| 100K | 20 | 29.1s | 29.6s | 1.7% faster |
npm run benchmark -- --realistic