Skip to content

Job Retention, Bulk Dispatch & Storage Redesign

Choose a tag to compare

@RomainLanz RomainLanz released this 15 Jan 16:40
· 17 commits to main since this release
v0.3.0
1fe1515

Breaking Changes

Redis + Knex storage layout

This release introduces a new storage layout for both Redis and Knex adapters. Existing data is incompatible and requires migration.

The storage architecture has been completely redesigned to support new features like job retention and status tracking. This change affects both Redis and Knex adapters.


Redis adapter migration

New storage layout

The Redis adapter now stores job payloads in a dedicated hash and tracks queue state in separate sets/hashes:

Key Type Description
jobs::<queue>::data Hash jobId -> job payload JSON
jobs::<queue>::pending Sorted Set jobId -> priority score
jobs::<queue>::delayed Sorted Set jobId -> executeAt timestamp
jobs::<queue>::active Hash jobId -> { workerId, acquiredAt }
jobs::<queue>::completed Hash jobId -> { finishedAt }
jobs::<queue>::completed::index Sorted Set jobId -> finishedAt (for pruning)
jobs::<queue>::failed Hash jobId -> { finishedAt, error }
jobs::<queue>::failed::index Sorted Set jobId -> finishedAt (for pruning)

Migration steps

Option 1: Flush existing data (recommended for alpha users)

redis-cli KEYS "jobs::*" | xargs redis-cli DEL

Option 2: Wait for existing jobs to complete

  1. Stop pushing new jobs
  2. Wait for all workers to drain existing queues
  3. Deploy new version
  4. Clean up old keys:
# Remove old format keys (adjust pattern to your prefix)
redis-cli KEYS "jobs::*" | grep -v "::data\|::pending\|::delayed\|::active\|::completed\|::failed" | xargs redis-cli DEL

Knex adapter migration

Schema changes

The Knex adapter now persists completed/failed job state. Existing tables need these changes:

Column Type Description
finished_at BIGINT (nullable) Completion/failure timestamp
error TEXT (nullable) Error message for failed jobs
status ENUM Add completed and failed values

New index for pruning queries:

  • (queue, status, finished_at)

Migration SQL

PostgreSQL:

-- Add new columns
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT;
ALTER TABLE queue_jobs ADD COLUMN error TEXT;

-- Add new enum values (PostgreSQL specific)
ALTER TYPE queue_jobs_status ADD VALUE 'completed';
ALTER TYPE queue_jobs_status ADD VALUE 'failed';

-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);

MySQL:

-- Add new columns
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT UNSIGNED NULL;
ALTER TABLE queue_jobs ADD COLUMN error TEXT NULL;

-- Modify enum to include new values
ALTER TABLE queue_jobs MODIFY COLUMN status ENUM('pending', 'active', 'delayed', 'completed', 'failed') NOT NULL;

-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);

SQLite:

-- Add new columns (SQLite doesn't enforce enum, so status just works)
ALTER TABLE queue_jobs ADD COLUMN finished_at INTEGER;
ALTER TABLE queue_jobs ADD COLUMN error TEXT;

-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);

Fresh install

For new installations, drop and recreate the table to get the new schema automatically:

DROP TABLE IF EXISTS queue_jobs;
-- Table will be recreated on first adapter use

New Features

Bulk job dispatch with dispatchMany

Jobs can now be dispatched in batches using Job.dispatchMany(). This is more efficient than calling dispatch() multiple times as it uses optimized batch operations (Redis MULTI/EXEC transaction, SQL batch insert).

// Dispatch multiple jobs at once
const { jobIds } = await SendEmailJob.dispatchMany([
  { to: '[email protected]', subject: 'Newsletter' },
  { to: '[email protected]', subject: 'Newsletter' },
  { to: '[email protected]', subject: 'Newsletter' },
])
  .group('newsletter-jan-2025')
  .toQueue('emails')
  .priority(3)
  .run()

console.log(`Dispatched ${jobIds.length} jobs`)

Use cases

  • Newsletters: Send thousands of emails in a single batch operation
  • Bulk exports: Create export jobs for multiple users
  • Data migrations: Queue many transformation jobs at once
  • Notifications: Dispatch notifications to many recipients

API

The JobBatchDispatcher supports the same fluent API as JobDispatcher:

await SendEmailJob.dispatchMany(payloads)
  .toQueue('emails')      // Target queue
  .group('batch-123')     // Group all jobs together
  .priority(1)            // Set priority for all jobs
  .with('redis')          // Use specific adapter
  .run()

Adapter API

For low-level access, adapters now support pushMany() and pushManyOn():

await adapter.pushManyOn('emails', [
  { id: 'uuid1', name: 'SendEmailJob', payload: {...}, attempts: 0 },
  { id: 'uuid2', name: 'SendEmailJob', payload: {...}, attempts: 0 },
])

Job grouping with groupId

Jobs can now be assigned to a group using the groupId option. This allows organizing related jobs together for easier monitoring and filtering in UIs.

// Assign jobs to a group
await SendEmailJob.dispatch({ to: '[email protected]' })
  .group('newsletter-jan-2025')
  .run()

// Combine with other options
await ExportJob.dispatch({ userId: 1 })
  .group('batch-export-123')
  .toQueue('exports')
  .priority(2)
  .run()

Use cases

  • Batch operations: Group all jobs from a newsletter send, bulk export, or data migration
  • Monitoring: Filter and view related jobs together in queue UIs
  • Debugging: Easily find all jobs related to a specific operation

API

The groupId is stored with the job data and can be accessed via:

const record = await adapter.getJob('job-id', 'queue-name')
console.log(record.data.groupId) // 'newsletter-jan-2025'

Job retention

Jobs can now be kept in history after completion or failure using the removeOnComplete and removeOnFail options:

// Global configuration
await QueueManager.init({
  // ...
  defaultJobOptions: {
    removeOnComplete: false, // Keep all completed jobs
    removeOnFail: { count: 100 }, // Keep last 100 failed jobs
  },
})

// Per-queue configuration
await QueueManager.init({
  // ...
  queues: {
    critical: {
      defaultJobOptions: {
        removeOnFail: { age: '7d', count: 1000 }, // Keep for 7 days, max 1000
      },
    },
  },
})

// Per-job configuration (via Job class)
class MyJob extends Job {
  static options = {
    removeOnComplete: { count: 50 },
  }
}

Retention options

  • true (default): Remove job immediately after completion/failure
  • false: Keep job in history indefinitely
  • { age?: Duration, count?: number }: Keep with pruning by age and/or count

Job status API

A new getJob method allows retrieving job status and data:

const adapter = QueueManager.use()
const record = await adapter.getJob('job-id', 'queue-name')

if (record) {
  console.log(record.status) // 'pending' | 'active' | 'delayed' | 'completed' | 'failed'
  console.log(record.data) // Original job data
  console.log(record.finishedAt) // Timestamp (for completed/failed)
  console.log(record.error) // Error message (for failed)
}

Commits

  • docs: refactor README with new features and better structure (a0bc28c)
  • fix: prevent double-claim on SQLite with status guard (1a089dd)
  • feat: add dispatchMany for bulk job dispatch (9bc5fa8)
  • feat: add groupId support for job grouping (b2594d2)
  • docs(examples): add job retention options (bd3b8d1)
  • chore(tests): remove unnecessary locations config and debug logs (046faf1)
  • feat: add job history and retention (1db9f85)
  • feat: add job retention types (839f42e)
  • chore: add funding file (69f9f39)
  • docs: use markdown note (3f33166)

Full Changelog: v0.2.0...v0.3.0