Skip to content

Commit c1ac86c

Browse files
authored
feat: add automatic requeue for stalled tasks via cron job (#591)
# Add automatic requeue for stalled tasks via cron job This PR implements a system to automatically detect and requeue tasks that have stalled due to worker crashes or other issues. Key features: - Added a `requeue_stalled_tasks()` function that identifies tasks stuck in 'started' status beyond their timeout window - Tasks can be requeued up to 3 times before being marked as failed - Added tracking columns to `step_tasks` table: `requeued_count` and `last_requeued_at` - Implemented a configurable cron job via `setup_requeue_stalled_tasks_cron()` that runs every 15 seconds by default - Added comprehensive test suite covering basic requeuing, max requeue limits, and multi-flow scenarios - Increased default visibility timeout in edge-worker from 2 to 5 seconds for better reliability This enhancement improves system resilience by ensuring tasks don't remain stuck when workers crash unexpectedly, addressing issue #586.
1 parent 28ad59a commit c1ac86c

12 files changed

+678
-11
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@pgflow/core': patch
3+
'@pgflow/edge-worker': patch
4+
---
5+
6+
Add automatic requeue for stalled tasks via cron job - tasks stuck beyond timeout+30s are requeued up to 3 times, then archived with status left as 'started' for easy identification (closes #586)

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ create table pgflow.step_tasks (
8181
completed_at timestamptz,
8282
failed_at timestamptz,
8383
last_worker_id uuid references pgflow.workers (worker_id) on delete set null,
84+
-- Requeue tracking columns
85+
requeued_count int not null default 0,
86+
last_requeued_at timestamptz,
87+
permanently_stalled_at timestamptz,
8488
constraint step_tasks_pkey primary key (run_id, step_slug, task_index),
8589
foreign key (run_id, step_slug)
8690
references pgflow.step_states (run_id, step_slug),
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
-- Requeue stalled tasks that have been in 'started' status longer than their timeout + 30s buffer
2+
-- This handles tasks that got stuck when workers crashed without completing them
3+
create or replace function pgflow.requeue_stalled_tasks()
4+
returns int
5+
language plpgsql
6+
security definer
7+
set search_path = ''
8+
as $$
9+
declare
10+
result_count int := 0;
11+
max_requeues constant int := 3;
12+
begin
13+
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
14+
-- Tasks with requeued_count >= max_requeues will have their message archived
15+
-- but status left as 'started' for easy identification via requeued_count column
16+
with stalled_tasks as (
17+
select
18+
st.run_id,
19+
st.step_slug,
20+
st.task_index,
21+
st.message_id,
22+
r.flow_slug,
23+
st.requeued_count,
24+
f.opt_timeout
25+
from pgflow.step_tasks st
26+
join pgflow.runs r on r.run_id = st.run_id
27+
join pgflow.flows f on f.flow_slug = r.flow_slug
28+
where st.status = 'started'
29+
and st.permanently_stalled_at is null
30+
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
31+
for update of st skip locked
32+
),
33+
-- Separate tasks that can be requeued from those that exceeded max requeues
34+
to_requeue as (
35+
select * from stalled_tasks where requeued_count < max_requeues
36+
),
37+
to_archive as (
38+
select * from stalled_tasks where requeued_count >= max_requeues
39+
),
40+
-- Update tasks that will be requeued
41+
requeued as (
42+
update pgflow.step_tasks st
43+
set
44+
status = 'queued',
45+
started_at = null,
46+
last_worker_id = null,
47+
requeued_count = st.requeued_count + 1,
48+
last_requeued_at = now()
49+
from to_requeue tr
50+
where st.run_id = tr.run_id
51+
and st.step_slug = tr.step_slug
52+
and st.task_index = tr.task_index
53+
returning tr.flow_slug as queue_name, tr.message_id
54+
),
55+
-- Make requeued messages visible immediately (batched per queue)
56+
visibility_reset as (
57+
select pgflow.set_vt_batch(
58+
r.queue_name,
59+
array_agg(r.message_id),
60+
array_agg(0) -- all offsets are 0 (immediate visibility)
61+
)
62+
from requeued r
63+
where r.message_id is not null
64+
group by r.queue_name
65+
),
66+
-- Mark tasks as permanently stalled before archiving
67+
mark_permanently_stalled as (
68+
update pgflow.step_tasks st
69+
set permanently_stalled_at = now()
70+
from to_archive ta
71+
where st.run_id = ta.run_id
72+
and st.step_slug = ta.step_slug
73+
and st.task_index = ta.task_index
74+
returning st.run_id
75+
),
76+
-- Archive messages for tasks that exceeded max requeues (batched per queue)
77+
archived as (
78+
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
79+
from to_archive ta
80+
where ta.message_id is not null
81+
group by ta.flow_slug
82+
),
83+
-- Force execution of visibility_reset CTE
84+
_vr as (select count(*) from visibility_reset),
85+
-- Force execution of mark_permanently_stalled CTE
86+
_mps as (select count(*) from mark_permanently_stalled),
87+
-- Force execution of archived CTE
88+
_ar as (select count(*) from archived)
89+
select count(*) into result_count
90+
from requeued, _vr, _mps, _ar;
91+
92+
return result_count;
93+
end;
94+
$$;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
-- Cron setup function for automatic requeue monitoring
2+
create or replace function pgflow.setup_requeue_stalled_tasks_cron(
3+
cron_interval text default '15 seconds'
4+
)
5+
returns text
6+
language plpgsql
7+
security definer
8+
set search_path = pgflow, cron, pg_temp
9+
as $$
10+
declare
11+
job_id bigint;
12+
begin
13+
-- Remove existing job if any
14+
begin
15+
perform cron.unschedule('pgflow_requeue_stalled_tasks');
16+
exception
17+
when others then null;
18+
end;
19+
20+
-- Schedule the new job
21+
job_id := cron.schedule(
22+
job_name => 'pgflow_requeue_stalled_tasks',
23+
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
24+
command => 'select pgflow.requeue_stalled_tasks()'
25+
);
26+
27+
return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
28+
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
29+
end;
30+
$$;
31+
32+
comment on function pgflow.setup_requeue_stalled_tasks_cron(text) is
33+
'Sets up cron job to automatically requeue stalled tasks.
34+
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
35+
Replaces existing job if it exists (idempotent).
36+
Returns a confirmation message with job ID.';
37+
38+
-- Automatically set up the cron job when migration runs
39+
select pgflow.setup_requeue_stalled_tasks_cron();

pkgs/core/src/database-types.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,13 @@ export type Database = {
197197
error_message: string | null
198198
failed_at: string | null
199199
flow_slug: string
200+
last_requeued_at: string | null
200201
last_worker_id: string | null
201202
message_id: number | null
202203
output: Json | null
204+
permanently_stalled_at: string | null
203205
queued_at: string
206+
requeued_count: number
204207
run_id: string
205208
started_at: string | null
206209
status: string
@@ -213,10 +216,13 @@ export type Database = {
213216
error_message?: string | null
214217
failed_at?: string | null
215218
flow_slug: string
219+
last_requeued_at?: string | null
216220
last_worker_id?: string | null
217221
message_id?: number | null
218222
output?: Json | null
223+
permanently_stalled_at?: string | null
219224
queued_at?: string
225+
requeued_count?: number
220226
run_id: string
221227
started_at?: string | null
222228
status?: string
@@ -229,10 +235,13 @@ export type Database = {
229235
error_message?: string | null
230236
failed_at?: string | null
231237
flow_slug?: string
238+
last_requeued_at?: string | null
232239
last_worker_id?: string | null
233240
message_id?: number | null
234241
output?: Json | null
242+
permanently_stalled_at?: string | null
235243
queued_at?: string
244+
requeued_count?: number
236245
run_id?: string
237246
started_at?: string | null
238247
status?: string
@@ -445,10 +454,13 @@ export type Database = {
445454
error_message: string | null
446455
failed_at: string | null
447456
flow_slug: string
457+
last_requeued_at: string | null
448458
last_worker_id: string | null
449459
message_id: number | null
450460
output: Json | null
461+
permanently_stalled_at: string | null
451462
queued_at: string
463+
requeued_count: number
452464
run_id: string
453465
started_at: string | null
454466
status: string
@@ -512,10 +524,13 @@ export type Database = {
512524
error_message: string | null
513525
failed_at: string | null
514526
flow_slug: string
527+
last_requeued_at: string | null
515528
last_worker_id: string | null
516529
message_id: number | null
517530
output: Json | null
531+
permanently_stalled_at: string | null
518532
queued_at: string
533+
requeued_count: number
519534
run_id: string
520535
started_at: string | null
521536
status: string
@@ -550,6 +565,7 @@ export type Database = {
550565
isSetofReturn: true
551566
}
552567
}
568+
requeue_stalled_tasks: { Args: never; Returns: number }
553569
set_vt_batch: {
554570
Args: { msg_ids: number[]; queue_name: string; vt_offsets: number[] }
555571
Returns: {
@@ -565,6 +581,10 @@ export type Database = {
565581
Args: { cron_interval?: string }
566582
Returns: string
567583
}
584+
setup_requeue_stalled_tasks_cron: {
585+
Args: { cron_interval?: string }
586+
Returns: string
587+
}
568588
start_flow: {
569589
Args: { flow_slug: string; input: Json; run_id?: string }
570590
Returns: {
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
-- Modify "step_tasks" table
2+
ALTER TABLE "pgflow"."step_tasks" ADD COLUMN "requeued_count" integer NOT NULL DEFAULT 0, ADD COLUMN "last_requeued_at" timestamptz NULL, ADD COLUMN "permanently_stalled_at" timestamptz NULL;
3+
-- Create "requeue_stalled_tasks" function
4+
CREATE FUNCTION "pgflow"."requeue_stalled_tasks" () RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = '' AS $$
5+
declare
6+
result_count int := 0;
7+
max_requeues constant int := 3;
8+
begin
9+
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
10+
-- Tasks with requeued_count >= max_requeues will have their message archived
11+
-- but status left as 'started' for easy identification via requeued_count column
12+
with stalled_tasks as (
13+
select
14+
st.run_id,
15+
st.step_slug,
16+
st.task_index,
17+
st.message_id,
18+
r.flow_slug,
19+
st.requeued_count,
20+
f.opt_timeout
21+
from pgflow.step_tasks st
22+
join pgflow.runs r on r.run_id = st.run_id
23+
join pgflow.flows f on f.flow_slug = r.flow_slug
24+
where st.status = 'started'
25+
and st.permanently_stalled_at is null
26+
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
27+
for update of st skip locked
28+
),
29+
-- Separate tasks that can be requeued from those that exceeded max requeues
30+
to_requeue as (
31+
select * from stalled_tasks where requeued_count < max_requeues
32+
),
33+
to_archive as (
34+
select * from stalled_tasks where requeued_count >= max_requeues
35+
),
36+
-- Update tasks that will be requeued
37+
requeued as (
38+
update pgflow.step_tasks st
39+
set
40+
status = 'queued',
41+
started_at = null,
42+
last_worker_id = null,
43+
requeued_count = st.requeued_count + 1,
44+
last_requeued_at = now()
45+
from to_requeue tr
46+
where st.run_id = tr.run_id
47+
and st.step_slug = tr.step_slug
48+
and st.task_index = tr.task_index
49+
returning tr.flow_slug as queue_name, tr.message_id
50+
),
51+
-- Make requeued messages visible immediately (batched per queue)
52+
visibility_reset as (
53+
select pgflow.set_vt_batch(
54+
r.queue_name,
55+
array_agg(r.message_id),
56+
array_agg(0) -- all offsets are 0 (immediate visibility)
57+
)
58+
from requeued r
59+
where r.message_id is not null
60+
group by r.queue_name
61+
),
62+
-- Mark tasks as permanently stalled before archiving
63+
mark_permanently_stalled as (
64+
update pgflow.step_tasks st
65+
set permanently_stalled_at = now()
66+
from to_archive ta
67+
where st.run_id = ta.run_id
68+
and st.step_slug = ta.step_slug
69+
and st.task_index = ta.task_index
70+
returning st.run_id
71+
),
72+
-- Archive messages for tasks that exceeded max requeues (batched per queue)
73+
archived as (
74+
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
75+
from to_archive ta
76+
where ta.message_id is not null
77+
group by ta.flow_slug
78+
),
79+
-- Force execution of visibility_reset CTE
80+
_vr as (select count(*) from visibility_reset),
81+
-- Force execution of mark_permanently_stalled CTE
82+
_mps as (select count(*) from mark_permanently_stalled),
83+
-- Force execution of archived CTE
84+
_ar as (select count(*) from archived)
85+
select count(*) into result_count
86+
from requeued, _vr, _mps, _ar;
87+
88+
return result_count;
89+
end;
90+
$$;
91+
-- Create "setup_requeue_stalled_tasks_cron" function
92+
CREATE FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" ("cron_interval" text DEFAULT '15 seconds') RETURNS text LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$
93+
declare
94+
job_id bigint;
95+
begin
96+
-- Remove existing job if any
97+
begin
98+
perform cron.unschedule('pgflow_requeue_stalled_tasks');
99+
exception
100+
when others then null;
101+
end;
102+
103+
-- Schedule the new job
104+
job_id := cron.schedule(
105+
job_name => 'pgflow_requeue_stalled_tasks',
106+
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
107+
command => 'select pgflow.requeue_stalled_tasks()'
108+
);
109+
110+
return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
111+
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
112+
end;
113+
$$;
114+
-- Set comment to function: "setup_requeue_stalled_tasks_cron"
115+
COMMENT ON FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" IS 'Sets up cron job to automatically requeue stalled tasks.
116+
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
117+
Replaces existing job if it exists (idempotent).
118+
Returns a confirmation message with job ID.';
119+
-- Automatically set up the cron job
120+
SELECT pgflow.setup_requeue_stalled_tasks_cron();

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
1+
h1:dzKOHL+hbunxWTZaGOIDWQG9THDva7Pk7VVDGASJkps=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -16,3 +16,4 @@ h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
1616
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
1717
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
19+
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=

0 commit comments

Comments
 (0)