From 408bdb6dd7dc7e009782d763d2323ab4735a0207 Mon Sep 17 00:00:00 2001 From: francy51 Date: Wed, 17 Jun 2026 18:53:44 -0400 Subject: [PATCH] fix(kanban): recover and self-heal stuck agent runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A run could get stranded at 'running' in the UI after a crash/disconnect/ restart, with no way to clear it. Root cause was a race: the SSE history replay re-asserted a stale `running` status that beat the poll's settled status, leaving the run showing "Running" + the settle error at once. Server (runs.ts / runner.ts / index.ts): - reconcile() on every read force-settles any 'running' run with no live runner, so the board self-heals on the next poll (≤3s) — no restart needed. - forceSettle() emits a persisted `status` event so an open/reconnecting SSE stream replays the terminal state last, not a stale `running`. - Startup orphan-reconciliation now also emits that event (was the gap that let the replay re-assert `running` after a server restart). - Idle watchdog (10min): a silent pi is settled as 'failed' instead of hanging forever; SIGKILL escalation (20s) reaps wedged processes. - stop() now recovers: active→abort, orphaned-but-running→force-stop (the Stop button clears wedged runs instead of 409'ing). - start() catch force-settles 'failed' so a spawn failure never orphans a half-created 'running' row. Client (useOrchestrator.ts): - patchRun refuses to un-settle a terminal run, dropping stale replayed status as a belt-and-suspenders guard against any such race. EOF && echo "" && git log --oneline -3 --- apps/api/src/index.ts | 21 ++++-- apps/api/src/orchestrator/runner.ts | 46 +++++++++++- apps/api/src/orchestrator/runs.ts | 73 +++++++++++++++++-- apps/api/src/routes/orchestrator.ts | 2 +- .../src/components/kanban/useOrchestrator.ts | 25 ++++++- 5 files changed, 152 insertions(+), 15 deletions(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 650bd8d..350152f 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -2,6 +2,7 @@ import { serve } from '@hono/node-server'; import { Hono } from 'hono'; import './db.js'; // importing db runs migrate() import { db } from './db.js'; +import { appendRunEvent } from './orchestrator/events.js'; import { kanban } from './routes/kanban.js'; import { pages } from './routes/pages.js'; import { orchestrator } from './routes/orchestrator.js'; @@ -11,11 +12,21 @@ import { internal } from './routes/internal.js'; // marked running/queued has no live subprocess, so mark it stopped. { const now = new Date().toISOString(); - const result = db - .prepare("UPDATE agent_runs SET status = 'stopped', finished_at = COALESCE(finished_at, ?), error = COALESCE(error, 'interrupted by server restart') WHERE status IN ('running', 'queued')") - .run(now); - if (result.changes > 0) { - console.log(`[api] reconciled ${result.changes} interrupted agent run(s) → stopped`); + const orphans = db + .prepare("SELECT id FROM agent_runs WHERE status IN ('running', 'queued')") + .all() as { id: string }[]; + for (const { id } of orphans) { + db.prepare( + "UPDATE agent_runs SET status = 'stopped', finished_at = COALESCE(finished_at, ?), error = COALESCE(error, 'interrupted by server restart') WHERE id = ?", + ).run(now, id); + // Emit a persisted status event so an open/reconnecting SSE stream replays + // the terminal state last — otherwise its history replay would re-assert the + // stale `running` status and leave the UI stuck mid-run. + appendRunEvent(id, 'status', { status: 'stopped' }); + appendRunEvent(id, 'log', { level: 'warn', text: 'interrupted by server restart' }); + } + if (orphans.length > 0) { + console.log(`[api] reconciled ${orphans.length} interrupted agent run(s) → stopped`); } } diff --git a/apps/api/src/orchestrator/runner.ts b/apps/api/src/orchestrator/runner.ts index e9b7204..a131b3e 100644 --- a/apps/api/src/orchestrator/runner.ts +++ b/apps/api/src/orchestrator/runner.ts @@ -76,6 +76,11 @@ export class Runner { private pending = 0; /** Watchdog that force-settles a run if the agent stalls with a queued msg. */ private watchdog: ReturnType | null = null; + /** Liveness watchdog: if pi emits nothing for this long, it's hung (stalled + * LLM call, wedged tool, provider drop) — settle as failed so the run doesn't + * sit at 'running' forever. Reset on every line pi sends. */ + private idleTimer: ReturnType | null = null; + private static readonly IDLE_TIMEOUT_MS = 10 * 60 * 1000; constructor(opts: RunnerOptions) { this.runId = opts.runId; @@ -128,6 +133,7 @@ export class Runner { this.send({ type: 'prompt', message: this.prompt }); this.emit({ type: 'log', level: 'info', text: `Started pi (cwd ${this.cwd})${this.resumeSession ? ' [resumed session]' : ''}` }); this.emit({ type: 'status', status: 'running' }); + this.armIdle(); } /** Queue a steering or follow-up message mid-run. */ @@ -152,7 +158,8 @@ export class Runner { } catch { /* stdin may already be closed */ } - // Hard kill backstop if pi doesn't exit promptly after abort. + // Escalating kill backstop if pi doesn't exit promptly after abort: + // SIGTERM first, then SIGKILL so a truly wedged process can still be reaped. setTimeout(() => { if (this.proc && !this.settled) { try { @@ -162,6 +169,16 @@ export class Runner { } } }, 15_000); + // SIGKILL shortly after if SIGTERM didn't do it. + setTimeout(() => { + if (this.proc && !this.settled) { + try { + this.proc.kill('SIGKILL'); + } catch { + /* ignore */ + } + } + }, 20_000); } // --- stdin / stdout handling -------------------------------------------- @@ -193,6 +210,8 @@ export class Runner { } catch { return; } + // Any valid JSON line from pi means it's alive — push the liveness watchdog. + this.armIdle(); switch (msg.type) { case 'response': // Capture the persisted session file from the init get_state call so the @@ -264,6 +283,7 @@ export class Runner { // close), finalize now. Normal completion is driven by `maybeComplete()`. if (this.settled) return; this.clearWatchdog(); + this.clearIdle(); const crashed = code !== null && code !== 0; let status: RunStatus; if (this.stopping) status = 'stopped'; @@ -314,6 +334,29 @@ export class Runner { } } + /** Arm (or refresh) the liveness watchdog: settle as failed if pi goes silent + * for IDLE_TIMEOUT_MS. Call `armIdle()` on every sign of life from pi. */ + private armIdle(): void { + if (this.settled) return; + this.clearIdle(); + this.idleTimer = setTimeout(() => { + if (!this.settled) { + this.emit({ + type: 'error', + message: `Agent went idle for ${Math.round(Runner.IDLE_TIMEOUT_MS / 1000)}s with no output — treating as hung.`, + }); + this.settle('failed', this.lastAssistantText || null); + } + }, Runner.IDLE_TIMEOUT_MS); + } + + private clearIdle(): void { + if (this.idleTimer) { + clearTimeout(this.idleTimer); + this.idleTimer = null; + } + } + private fail(message: string): void { if (this.settled) return; this.emit({ type: 'error', message }); @@ -324,6 +367,7 @@ export class Runner { if (this.settled) return; this.settled = true; this.clearWatchdog(); + this.clearIdle(); this.emit({ type: 'status', status }); if (status === 'completed') this.emit({ type: 'done', summary: summary ?? '' }); if (status === 'failed' && summary) this.emit({ type: 'text', text: summary }); diff --git a/apps/api/src/orchestrator/runs.ts b/apps/api/src/orchestrator/runs.ts index 060e5fc..cea962f 100644 --- a/apps/api/src/orchestrator/runs.ts +++ b/apps/api/src/orchestrator/runs.ts @@ -139,8 +139,12 @@ class RunManager { }); runner.start(); } catch (err) { - // Persisting or starting failed — reclaim only a worktree we created. + // Persisting or starting failed — reclaim only a worktree we created, and + // never leave a half-created row stuck at 'running'. + const message = err instanceof Error ? err.message : String(err); if (worktree && ownsWorktree) removeWorktree(id, worktree.branch); + this.active.delete(id); + this.forceSettle(id, 'failed', `failed to start agent: ${message}`); throw err; } @@ -159,11 +163,21 @@ class RunManager { entry.runner.message(text, mode); } - /** Abort an active run. */ + /** Abort an active run, or force-settle a wedged/orphaned one (recovery). */ stop(id: string): void { const entry = this.active.get(id); - if (!entry) throw new RunNotFoundError(id); - entry.runner.stop(); + if (entry) { + entry.runner.stop(); + return; + } + // No live runner. If the DB still shows it running, force-settle so the + // operator can recover a wedged/orphaned run from the UI instead of it + // being stuck at 'running' with no process behind it. + const run = this.rawGet(id); + if (!run) throw new RunNotFoundError(id); + if (run.status === 'running') { + this.forceSettle(id, 'stopped', 'stopped by operator (no live agent process)'); + } } /** Subscribe to a run's live slim-event stream (best-effort; may be inactive). */ @@ -219,6 +233,12 @@ class RunManager { // --- reads --------------------------------------------------------------- get(id: string): AgentRun | undefined { + return this.reconcile(this.rawGet(id)); + } + + /** Read a run WITHOUT orphan-reconciliation (used where we inspect status + * before deciding to force-settle, e.g. `stop()`). */ + private rawGet(id: string): AgentRun | undefined { const row = db.prepare('SELECT * FROM agent_runs WHERE id = ?').get(id) as AgentRunRow | undefined; return row ? hydrateRun(row) : undefined; } @@ -236,7 +256,7 @@ class RunManager { const rows = db .prepare('SELECT * FROM agent_runs ORDER BY created_at DESC LIMIT ?') .all(limit) as AgentRunRow[]; - return rows.map(hydrateRun); + return rows.map((r) => this.reconcile(hydrateRun(r)) as AgentRun); } /** Runs for a given card (newest first). */ @@ -244,7 +264,7 @@ class RunManager { const rows = db .prepare('SELECT * FROM agent_runs WHERE card_id = ? ORDER BY created_at DESC') .all(cardId) as AgentRunRow[]; - return rows.map(hydrateRun); + return rows.map((r) => this.reconcile(hydrateRun(r)) as AgentRun); } /** Replayable event history for a run, optionally after a sequence number. */ @@ -264,6 +284,47 @@ class RunManager { // --- internal: finalize a run ------------------------------------------- + /** + * Force-settle a run straight in the DB (no live runner). Used to recover + * runs stranded at 'running' — orphans with no live subprocess (crash, + * server restart mid-run, spawn failure) — and to honor an operator stop on a + * wedged run. Emits a persisted `status` event so any open stream reflects + * the new status immediately, and a `log` line with the reason. + */ + private forceSettle(id: string, status: RunStatus, reason: string): void { + const run = this.rawGet(id); + if (!run) return; + const now = new Date().toISOString(); + // Best-effort: capture the head commit if the worktree is still around, so + // a recovered run can still be reviewed/diffed. Never wipe an existing one. + let commitSha: string | null = null; + if (run.worktreePath && isWorktreePresent(run.worktreePath)) { + commitSha = headSha(run.worktreePath); + } + appendRunEvent(id, 'status', { status }); + appendRunEvent(id, 'log', { level: 'warn', text: reason }); + db.prepare( + `UPDATE agent_runs + SET status = ?, error = ?, commit_sha = COALESCE(?, commit_sha), finished_at = ? + WHERE id = ?`, + ).run(status, reason, commitSha, now, id); + this.active.delete(id); + } + + /** + * Self-heal a run stranded at 'running' with no live runner in this process. + * Called on every read (get/list) so the board recovers automatically on the + * next poll — no manual action or server restart required. Returns the + * (possibly updated) run, or undefined if the run never existed. + */ + private reconcile(run: AgentRun | undefined): AgentRun | undefined { + if (run && run.status === 'running' && !this.active.has(run.id)) { + this.forceSettle(run.id, 'stopped', 'interrupted: no live agent process (server restarted or run crashed)'); + return this.rawGet(run.id); + } + return run; + } + private settle(id: string, status: RunStatus, summary: string | null): void { const entry = this.active.get(id); const now = new Date().toISOString(); diff --git a/apps/api/src/routes/orchestrator.ts b/apps/api/src/routes/orchestrator.ts index 4b4a929..4ffc08d 100644 --- a/apps/api/src/routes/orchestrator.ts +++ b/apps/api/src/routes/orchestrator.ts @@ -87,7 +87,7 @@ orchestrator.post('/runs/:id/message', async (c) => { } }); -/** Stop an active run. */ +/** Stop an active or wedged run (force-settles a run with no live process). */ orchestrator.post('/runs/:id/stop', (c) => { const id = c.req.param('id'); try { diff --git a/apps/docs/src/components/kanban/useOrchestrator.ts b/apps/docs/src/components/kanban/useOrchestrator.ts index 8f9c366..cf79bca 100644 --- a/apps/docs/src/components/kanban/useOrchestrator.ts +++ b/apps/docs/src/components/kanban/useOrchestrator.ts @@ -4,6 +4,7 @@ import { type AgentRun, type DiffResult, type MergeResult, + type RunStatus, } from '../../lib/orchestratorApi'; /** @@ -64,6 +65,10 @@ export interface UseOrchestrator { activeByCard: Map; } +/** Terminal states a run never legitimately leaves once reached. Used by + * `patchRun` to reject stale history replays that would un-settle a run. */ +const TERMINAL_STATUS: ReadonlySet = new Set(['completed', 'failed', 'stopped']); + export function useOrchestrator(): UseOrchestrator { const [runs, setRuns] = useState([]); const [bevyRunning, setBevyRunning] = useState>(new Set()); @@ -144,9 +149,25 @@ export function useOrchestrator(): UseOrchestrator { setRuns((prev) => prev.filter((r) => r.id !== runId)); }, []); - /** Apply a partial update to a run (status/summary/etc., from stream events). */ + /** + * Apply a partial update to a run (status/summary/etc., from stream events). + * Guards against stale history replays un-settling a run: a terminal run can + * never legitimately return to `running`/`queued`, so such a patch (e.g. a + * reconnect re-emitting an old `running` status) is dropped. + */ const patchRun = useCallback((runId: string, patch: Partial) => { - setRuns((prev) => prev.map((r) => (r.id === runId ? { ...r, ...patch } : r))); + setRuns((prev) => { + const cur = prev.find((r) => r.id === runId); + if ( + cur && + TERMINAL_STATUS.has(cur.status) && + patch.status !== undefined && + !TERMINAL_STATUS.has(patch.status) + ) { + return prev; // ignore stale status re-assertion + } + return prev.map((r) => (r.id === runId ? { ...r, ...patch } : r)); + }); }, []); /** Reflect a Bevy playtest lifecycle change (from stream events). */