Build Reliable Background Jobs with PostgreSQL Queues
Build reliable background jobs with PostgreSQL queues in EliteSaaS: SKIP LOCKED claiming, retries, cron scheduling, progress tracking, and Realtime monitoring.
Introduction
If you’re building a modern SaaS with Next.js and Supabase, you need background jobs: send emails, process Stripe events, generate AI content, schedule social posts, and run nightly tasks. EliteSaaS ships with a PostgreSQL-backed job queue that replaces Redis-based systems with a simpler, zero-extra-infrastructure approach—complete with priority management, exponential backoff retries, cron scheduling, multi-step progress tracking, and real-time monitoring via Supabase Realtime.
This in-depth engineering guide walks through the data model, worker design, retry strategies, performance indexing, and operational practices we use in EliteSaaS. You’ll also see practical patterns for email batching, social media scheduling, AI content generation, and subscription webhook processing.
Who this is for:
- Developers who want reliable background processing without standing up Redis
- Teams that value simplicity, cost-efficiency, and first-party data
- Builders who need clear patterns and production-ready guardrails
Why a PostgreSQL-backed queue?
A Redis queue is great for ultra-low-latency, high-throughput tasks—but it adds an extra moving part. For most SaaS workloads, PostgreSQL offers a sweet spot:
- One database, one source of truth: no extra service to deploy or pay for
- Transactional consistency: enqueue work atomically with domain writes
- Real-time monitoring: Supabase Realtime streams changes out-of-the-box
- Lower ops overhead: fewer services to observe, secure, and scale
Tradeoff: query latency (20–100ms) is higher than Redis (1–5ms). For email, webhooks, content generation, reporting, and scheduled tasks, that’s more than acceptable.
Data model: background_jobs and job_steps
EliteSaaS uses two core tables. background_jobs tracks each unit of work; job_steps records granular progress for multi-step jobs (perfect for long-running tasks).
Background jobs table
-- Job status and step enums
create type job_status as enum ('pending', 'running', 'completed', 'failed', 'canceled');
create type step_status as enum ('pending', 'running', 'completed', 'failed');
-- Core queue table
create table if not exists public.background_jobs (
id uuid primary key default gen_random_uuid(),
type text not null, -- e.g., 'email_batch', 'publish_social', 'ai_generate', 'stripe_event'
payload jsonb not null default '{}'::jsonb, -- parameters for the job
status job_status not null default 'pending',
priority smallint not null default 3, -- 1=low, 3=normal, 5=high
scheduled_for timestamptz not null default now(),
attempts int not null default 0,
max_attempts int not null default 5,
backoff_strategy text not null default 'exponential', -- 'exponential' | 'fixed'
last_error text,
external_id text, -- e.g., Stripe event id for idempotency
claimed_by text,
claimed_at timestamptz,
started_at timestamptz,
completed_at timestamptz,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
-- Idempotency for externally-sourced jobs (e.g., Stripe events)
create unique index if not exists background_jobs_external_id_idx
on public.background_jobs (external_id)
where external_id is not null;
-- Fast selection of runnable jobs
create index if not exists background_jobs_pending_idx
on public.background_jobs (scheduled_for asc, priority desc)
where status = 'pending';
-- Basic status index for dashboards
create index if not exists background_jobs_status_idx
on public.background_jobs (status);
-- Updated-at trigger for convenience
create or replace function public.touch_updated_at()
returns trigger language plpgsql as $$
begin new.updated_at = now(); return new; end; $$;
create trigger background_jobs_touch_updated
before update on public.background_jobs
for each row execute function public.touch_updated_at();
Job steps table
create table if not exists public.job_steps (
id uuid primary key default gen_random_uuid(),
job_id uuid not null references public.background_jobs(id) on delete cascade,
step_number int not null, -- 1,2,3,...
label text not null, -- human-readable
status step_status not null default 'pending',
progress int not null default 0, -- 0-100
metadata jsonb not null default '{}'::jsonb,
started_at timestamptz,
completed_at timestamptz,
created_at timestamptz not null default now()
);
create index if not exists job_steps_job_id_idx
on public.job_steps (job_id);
Job lifecycle and priority management
A typical lifecycle:
- Enqueue: insert into background_jobs with type, payload, priority, and scheduled_for
- Claim: a worker atomically claims the next runnable job (pending and scheduled_for <= now())
- Run: worker sets status=running, started_at=now(), and executes handlers
- Progress: insert/update job_steps as sub-steps complete, updating progress
- Complete: set status=completed, completed_at=now()
- Fail and retry: on error, increment attempts and reschedule with backoff if attempts < max_attempts
- Exhausted: status=failed once max_attempts reached
Priority is a smallint (1–5). The selector orders by priority DESC and scheduled_for ASC so urgent work is processed first, while respecting schedule times.
Claiming jobs safely (SKIP LOCKED)
We encapsulate selection and claiming in a SECURITY DEFINER function so workers can claim work atomically with FOR UPDATE SKIP LOCKED, even with RLS enabled.
create or replace function public.claim_next_job(p_worker_id text)
returns public.background_jobs
language plpgsql
security definer
as $$
declare v_job public.background_jobs;
begin
-- Select the highest-priority runnable job and lock it immediately
select * into v_job
from public.background_jobs
where status = 'pending' and scheduled_for <= now()
order by priority desc, scheduled_for asc, created_at asc
for update skip locked
limit 1;
if not found then
return null;
end if;
update public.background_jobs
set status = 'running', claimed_by = p_worker_id, claimed_at = now(), started_at = now()
where id = v_job.id
returning * into v_job;
return v_job;
end; $$;
Handling completion and failure (with exponential backoff):
create or replace function public.complete_job(p_job_id uuid)
returns void language sql as $$
update public.background_jobs
set status = 'completed', completed_at = now()
where id = p_job_id;
$$;
create or replace function public.fail_job(p_job_id uuid, p_error text)
returns void language plpgsql as $$
declare v_job public.background_jobs; v_delay interval; begin
select * into v_job from public.background_jobs where id = p_job_id for update;
if not found then return; end if;
if v_job.backoff_strategy = 'exponential' then
v_delay := make_interval(mins => least(pow(2, v_job.attempts)::int, 60)); -- capped at 60 minutes
else
v_delay := interval '5 minutes';
end if;
if v_job.attempts + 1 >= v_job.max_attempts then
update public.background_jobs
set status = 'failed', attempts = v_job.attempts + 1, last_error = p_error
where id = p_job_id;
else
update public.background_jobs
set status = 'pending', attempts = v_job.attempts + 1,
last_error = p_error, scheduled_for = now() + v_delay
where id = p_job_id;
end if;
end; $$;
Worker process (TypeScript) with progress reporting
Workers run as Node processes (or serverless background runners) using the Supabase service role key. They claim jobs via RPC and execute handlers based on job.type.
// worker.ts
import 'dotenv/config';
import { createClient } from '@supabase/supabase-js';
const supabase = createClient(
process.env.NEXT_PUBLIC_SUPABASE_URL!,
process.env.SUPABASE_SERVICE_ROLE_KEY!
);
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
// Helper to record a step with progress
async function runStep(jobId: string, stepNumber: number, label: string, fn: () => Promise<void>) {
const { data: step } = await supabase
.from('job_steps')
.insert({ job_id: jobId, step_number: stepNumber, label, status: 'running', progress: 0 })
.select()
.single();
try {
await fn();
await supabase
.from('job_steps')
.update({ status: 'completed', progress: 100, completed_at: new Date().toISOString() })
.eq('id', step!.id);
} catch (err: any) {
await supabase
.from('job_steps')
.update({ status: 'failed', progress: 100, metadata: { error: String(err?.message || err) } })
.eq('id', step!.id);
throw err;
}
}
// Handlers
async function handleEmailBatch(job: any) { /* send with Resend, batching */ }
async function handlePublishSocial(job: any) { /* LinkedIn/Twitter posting */ }
async function handleAIGenerate(job: any) { /* OpenAI/Claude/Gemini */ }
async function handleStripeEvent(job: any) { /* idempotent Stripe sync */ }
async function processJob(job: any) {
switch (job.type) {
case 'email_batch':
await runStep(job.id, 1, 'Compile audience', async () => { /* ... */ });
await runStep(job.id, 2, 'Render email', async () => { /* ... */ });
await runStep(job.id, 3, 'Send batch', async () => { /* ... */ });
break;
case 'publish_social':
await runStep(job.id, 1, 'Prepare media', async () => { /* ... */ });
await runStep(job.id, 2, 'Post to platform', async () => { /* ... */ });
break;
case 'ai_generate':
await runStep(job.id, 1, 'Generate outline', async () => { /* ... */ });
await runStep(job.id, 2, 'Write draft', async () => { /* ... */ });
break;
case 'stripe_event':
await runStep(job.id, 1, 'Apply event', async () => { /* ... */ });
break;
default:
throw new Error(`Unknown job type: ${job.type}`);
}
}
async function main() {
const workerId = `worker-${Math.random().toString(36).slice(2, 8)}`;
// Simple loop; consider concurrency pools for higher throughput
while (true) {
const { data: job } = await supabase.rpc('claim_next_job', { p_worker_id: workerId });
if (!job) {
await sleep(500); // idle backoff
continue;
}
try {
await processJob(job);
await supabase.rpc('complete_job', { p_job_id: job.id });
} catch (err: any) {
await supabase.rpc('fail_job', { p_job_id: job.id, p_error: String(err?.message || err) });
}
}
}
main().catch((e) => {
console.error('Worker crashed', e);
process.exit(1);
});
Tip: run multiple worker processes in parallel for higher throughput. SKIP LOCKED in the RPC ensures safe contention across processes.
Cron scheduling and recurring jobs
Use scheduled_for for one-off scheduling. For recurring tasks, store a cron expression and let a lightweight scheduler enqueue the next run.
Option A: dedicated recurring_jobs table
create table if not exists public.recurring_jobs (
id uuid primary key default gen_random_uuid(),
name text not null unique,
type text not null,
payload jsonb not null default '{}'::jsonb,
cron text not null, -- e.g., "0 * * * *" (hourly)
priority smallint not null default 3,
last_enqueued_at timestamptz,
active boolean not null default true
);
A tiny scheduler reads active rows and enqueues background_jobs at the right times.
// scheduler.ts
import parser from 'cron-parser';
import { createClient } from '@supabase/supabase-js';
const supabase = createClient(process.env.NEXT_PUBLIC_SUPABASE_URL!, process.env.SUPABASE_SERVICE_ROLE_KEY!);
async function tick() {
const { data: recurrences } = await supabase.from('recurring_jobs').select('*').eq('active', true);
const now = new Date();
for (const r of recurrences || []) {
const interval = parser.parseExpression(r.cron, { currentDate: r.last_enqueued_at || new Date(0) });
const next = interval.next().toDate();
if (next <= now) {
await supabase.from('background_jobs').insert({ type: r.type, payload: r.payload, priority: r.priority, scheduled_for: now });
await supabase.from('recurring_jobs').update({ last_enqueued_at: now.toISOString() }).eq('id', r.id);
}
}
}
setInterval(tick, 15_000); // check every 15s
Real-time monitoring with Supabase Realtime
The admin dashboard streams job and step updates for live visibility. Subscribe to database changes and render status, progress, and errors as they happen.
// admin-monitoring.ts (UI pseudocode)
import { createClient } from '@supabase/supabase-js';
const supabase = createClient(process.env.NEXT_PUBLIC_SUPABASE_URL!, process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY!);
// Stream job status changes
supabase
.channel('jobs')
.on('postgres_changes', { event: '*', schema: 'public', table: 'background_jobs' }, (payload) => {
// Update job cards in UI
// payload.new has latest row
})
.on('postgres_changes', { event: '*', schema: 'public', table: 'job_steps' }, (payload) => {
// Update step-by-step progress UI
})
.subscribe();
Admin dashboards typically include:
- Global queue health: counts by status (pending/running/completed/failed)
- Filters: by type, priority, claimed_by, attempts, time range
- Live stream: newest updates, errors, stack traces
- Job details: payload, steps, timings, error metadata
- Actions: cancel, retry, replay, copy payload for debugging
Performance: indexing and archival
Indexing and cleanup keep the queue snappy:
- Fast selection index (partial):
create index if not exists background_jobs_pending_idx
on public.background_jobs (scheduled_for asc, priority desc)
where status = 'pending';
- Steps lookup index:
create index if not exists job_steps_job_id_idx
on public.job_steps (job_id);
- Idempotency (external_id) unique partial index (shown above)
Archival policy: move or soft-delete old rows to keep tables lean.
-- Nightly archival of completed jobs older than 30 days
create table if not exists public.background_jobs_archive (like public.background_jobs including all);
insert into public.background_jobs_archive
select * from public.background_jobs
where status in ('completed','failed') and completed_at < now() - interval '30 days';
delete from public.background_jobs
where status in ('completed','failed') and completed_at < now() - interval '30 days';
If you prefer soft deletes, add deleted_at and filter in queries.
Common patterns you’ll use daily
1) Email batching
- Type: email_batch
- Steps: compile audience → render template → send batch → record analytics
- Notes: throttle sends, chunk large lists, tag UTM parameters for attribution
Pseudocode:
async function handleEmailBatch(job: any) {
const { segmentId, templateId } = job.payload;
await runStep(job.id, 1, 'Compile audience', async () => {
// Query your audience (e.g., verified users, sequence enrollees)
});
await runStep(job.id, 2, 'Render template', async () => {
// Build HTML using React Email; personalize vars
});
await runStep(job.id, 3, 'Send batch', async () => {
// Chunk recipients and send via Resend with rate limits
});
}
2) Social post scheduling
- Type: publish_social
- Steps: prepare media → post to platform → record link/analytics
- Notes: gracefully handle token refresh, exponential retry on rate limits
async function handlePublishSocial(job: any) {
const { platform, postId } = job.payload; // e.g., 'linkedin' or 'twitter'
await runStep(job.id, 1, 'Prepare media', async () => { /* download/resize */ });
await runStep(job.id, 2, 'Publish', async () => {
if (platform === 'linkedin') { /* LinkedInService.post(...) */ }
if (platform === 'twitter') { /* TwitterService.tweet(...) */ }
});
}
3) AI content generation
- Type: ai_generate
- Steps: generate outline → draft content → refine → store in library
- Notes: stream tokens to update progress; choose provider by content type
async function handleAIGenerate(job: any) {
const { provider, topic, tone } = job.payload;
await runStep(job.id, 1, 'Generate outline', async () => { /* Vercel AI SDK call */ });
await runStep(job.id, 2, 'Write draft', async () => { /* stream + persist */ });
await runStep(job.id, 3, 'Save', async () => { /* insert into ai_content */ });
}
4) Subscription webhook processing (Stripe)
- Type: stripe_event
- Steps: validate → apply changes → record idempotently
- Notes: enqueue with external_id = event.id to ensure exactly-once semantics
Enqueue from your webhook handler:
// Inside /api/stripe/webhook after verifying signature
await supabase
.from('background_jobs')
.insert({ type: 'stripe_event', payload: event, external_id: event.id, priority: 5 });
Handler:
async function handleStripeEvent(job: any) {
const event = job.payload;
// Switch on event.type and update subscriptions/products accordingly
// Use service role client to bypass RLS where appropriate
}
Operational tips: scaling, alerting, replay
- Horizontal scaling: run N worker processes per environment. SKIP LOCKED prevents double-claiming.
- Concurrency pools: process multiple jobs in parallel with a small pool size for throughput without overwhelming downstream APIs.
- Backpressure: cap maximum concurrent jobs and prioritize critical types (priority=5) when the queue grows.
- Alerting: subscribe to failed events and send an on-call email via the email system when failed jobs exceed a threshold in a time window.
- Dead-letter strategy: treat jobs that hit max_attempts as failed; allow manual replay from the admin dashboard after investigating.
- Replay safely: ensure handlers are idempotent (external_id, dedup keys), or add a dry-run step for destructive operations.
- Observability: log durations per step, attempts, and error codes; chart failure rate by type to locate flakiness quickly.
Tradeoffs vs Redis (and when to consider it)
- Latency: PostgreSQL selection is tens of milliseconds; Redis is single-digit. For most SaaS tasks, the difference is irrelevant.
- Throughput: PostgreSQL handles thousands of jobs/day comfortably. If you need many thousands per second, Redis/BullMQ is better.
- Complexity: PostgreSQL keeps your system simpler—one datastore, transactional enqueue, and Realtime monitoring for free.
- Cost: avoid an extra $25–$100/month for managed Redis and the ops overhead.
Rule of thumb: start with PostgreSQL for 95% of SaaS workloads. Consider Redis only if you can quantify a clear latency/throughput requirement that Postgres cannot meet.
Related reads (for real-world job types)
- Email Automation: how sequences, batching, and personalization are orchestrated as jobs
- Social Media Automation: scheduling and posting to LinkedIn and Twitter/X
- AI Content Generation: multi-step drafts, streaming, and library storage
- Admin Dashboard Monitoring: live queue health, search, filters, and replay controls
Note: see these companion posts in your documentation/blog library.
Conclusion
A PostgreSQL-backed queue gives you a reliable, cost-efficient background job system without the complexity of Redis. With SKIP LOCKED claiming, priority scheduling, exponential backoff, cron-based recurrence, and real-time visibility, you can operate everything from email campaigns to Stripe webhooks and AI content generation with confidence.
If you’re using EliteSaaS, you already have the foundation: background_jobs, job_steps, admin dashboards, and production patterns are included. Start by enqueuing a simple job, watch it flow through the dashboard, and then expand to multi-step workflows as your product grows.
Call to action: Start building your background jobs today. Open your EliteSaaS project, enable the worker process, and ship your first automated workflow in minutes. When you’re ready, scale out workers, add alerts, and let the queue power your product while you focus on features customers love.