fix(kanban): recover and self-heal stuck agent runs

A run could get stranded at 'running' in the UI after a crash/disconnect/
restart, with no way to clear it. Root cause was a race: the SSE history
replay re-asserted a stale `running` status that beat the poll's settled
status, leaving the run showing "Running" + the settle error at once.

Server (runs.ts / runner.ts / index.ts):
- reconcile() on every read force-settles any 'running' run with no live
  runner, so the board self-heals on the next poll (≤3s) — no restart needed.
- forceSettle() emits a persisted `status` event so an open/reconnecting
  SSE stream replays the terminal state last, not a stale `running`.
- Startup orphan-reconciliation now also emits that event (was the gap that
  let the replay re-assert `running` after a server restart).
- Idle watchdog (10min): a silent pi is settled as 'failed' instead of
  hanging forever; SIGKILL escalation (20s) reaps wedged processes.
- stop() now recovers: active→abort, orphaned-but-running→force-stop
  (the Stop button clears wedged runs instead of 409'ing).
- start() catch force-settles 'failed' so a spawn failure never orphans a
  half-created 'running' row.

Client (useOrchestrator.ts):
- patchRun refuses to un-settle a terminal run, dropping stale replayed
  status as a belt-and-suspenders guard against any such race.
EOF && echo "" && git log --oneline -3
This commit is contained in:
2026-06-17 18:53:44 -04:00
parent 6531dc00df
commit 408bdb6dd7
5 changed files with 152 additions and 15 deletions

View File

@@ -2,6 +2,7 @@ import { serve } from '@hono/node-server';
import { Hono } from 'hono'; import { Hono } from 'hono';
import './db.js'; // importing db runs migrate() import './db.js'; // importing db runs migrate()
import { db } from './db.js'; import { db } from './db.js';
import { appendRunEvent } from './orchestrator/events.js';
import { kanban } from './routes/kanban.js'; import { kanban } from './routes/kanban.js';
import { pages } from './routes/pages.js'; import { pages } from './routes/pages.js';
import { orchestrator } from './routes/orchestrator.js'; import { orchestrator } from './routes/orchestrator.js';
@@ -11,11 +12,21 @@ import { internal } from './routes/internal.js';
// marked running/queued has no live subprocess, so mark it stopped. // marked running/queued has no live subprocess, so mark it stopped.
{ {
const now = new Date().toISOString(); const now = new Date().toISOString();
const result = db const orphans = db
.prepare("UPDATE agent_runs SET status = 'stopped', finished_at = COALESCE(finished_at, ?), error = COALESCE(error, 'interrupted by server restart') WHERE status IN ('running', 'queued')") .prepare("SELECT id FROM agent_runs WHERE status IN ('running', 'queued')")
.run(now); .all() as { id: string }[];
if (result.changes > 0) { for (const { id } of orphans) {
console.log(`[api] reconciled ${result.changes} interrupted agent run(s) → stopped`); db.prepare(
"UPDATE agent_runs SET status = 'stopped', finished_at = COALESCE(finished_at, ?), error = COALESCE(error, 'interrupted by server restart') WHERE id = ?",
).run(now, id);
// Emit a persisted status event so an open/reconnecting SSE stream replays
// the terminal state last — otherwise its history replay would re-assert the
// stale `running` status and leave the UI stuck mid-run.
appendRunEvent(id, 'status', { status: 'stopped' });
appendRunEvent(id, 'log', { level: 'warn', text: 'interrupted by server restart' });
}
if (orphans.length > 0) {
console.log(`[api] reconciled ${orphans.length} interrupted agent run(s) → stopped`);
} }
} }

View File

@@ -76,6 +76,11 @@ export class Runner {
private pending = 0; private pending = 0;
/** Watchdog that force-settles a run if the agent stalls with a queued msg. */ /** Watchdog that force-settles a run if the agent stalls with a queued msg. */
private watchdog: ReturnType<typeof setTimeout> | null = null; private watchdog: ReturnType<typeof setTimeout> | null = null;
/** Liveness watchdog: if pi emits nothing for this long, it's hung (stalled
* LLM call, wedged tool, provider drop) — settle as failed so the run doesn't
* sit at 'running' forever. Reset on every line pi sends. */
private idleTimer: ReturnType<typeof setTimeout> | null = null;
private static readonly IDLE_TIMEOUT_MS = 10 * 60 * 1000;
constructor(opts: RunnerOptions) { constructor(opts: RunnerOptions) {
this.runId = opts.runId; this.runId = opts.runId;
@@ -128,6 +133,7 @@ export class Runner {
this.send({ type: 'prompt', message: this.prompt }); this.send({ type: 'prompt', message: this.prompt });
this.emit({ type: 'log', level: 'info', text: `Started pi (cwd ${this.cwd})${this.resumeSession ? ' [resumed session]' : ''}` }); this.emit({ type: 'log', level: 'info', text: `Started pi (cwd ${this.cwd})${this.resumeSession ? ' [resumed session]' : ''}` });
this.emit({ type: 'status', status: 'running' }); this.emit({ type: 'status', status: 'running' });
this.armIdle();
} }
/** Queue a steering or follow-up message mid-run. */ /** Queue a steering or follow-up message mid-run. */
@@ -152,7 +158,8 @@ export class Runner {
} catch { } catch {
/* stdin may already be closed */ /* stdin may already be closed */
} }
// Hard kill backstop if pi doesn't exit promptly after abort. // Escalating kill backstop if pi doesn't exit promptly after abort:
// SIGTERM first, then SIGKILL so a truly wedged process can still be reaped.
setTimeout(() => { setTimeout(() => {
if (this.proc && !this.settled) { if (this.proc && !this.settled) {
try { try {
@@ -162,6 +169,16 @@ export class Runner {
} }
} }
}, 15_000); }, 15_000);
// SIGKILL shortly after if SIGTERM didn't do it.
setTimeout(() => {
if (this.proc && !this.settled) {
try {
this.proc.kill('SIGKILL');
} catch {
/* ignore */
}
}
}, 20_000);
} }
// --- stdin / stdout handling -------------------------------------------- // --- stdin / stdout handling --------------------------------------------
@@ -193,6 +210,8 @@ export class Runner {
} catch { } catch {
return; return;
} }
// Any valid JSON line from pi means it's alive — push the liveness watchdog.
this.armIdle();
switch (msg.type) { switch (msg.type) {
case 'response': case 'response':
// Capture the persisted session file from the init get_state call so the // Capture the persisted session file from the init get_state call so the
@@ -264,6 +283,7 @@ export class Runner {
// close), finalize now. Normal completion is driven by `maybeComplete()`. // close), finalize now. Normal completion is driven by `maybeComplete()`.
if (this.settled) return; if (this.settled) return;
this.clearWatchdog(); this.clearWatchdog();
this.clearIdle();
const crashed = code !== null && code !== 0; const crashed = code !== null && code !== 0;
let status: RunStatus; let status: RunStatus;
if (this.stopping) status = 'stopped'; if (this.stopping) status = 'stopped';
@@ -314,6 +334,29 @@ export class Runner {
} }
} }
/** Arm (or refresh) the liveness watchdog: settle as failed if pi goes silent
* for IDLE_TIMEOUT_MS. Call `armIdle()` on every sign of life from pi. */
private armIdle(): void {
if (this.settled) return;
this.clearIdle();
this.idleTimer = setTimeout(() => {
if (!this.settled) {
this.emit({
type: 'error',
message: `Agent went idle for ${Math.round(Runner.IDLE_TIMEOUT_MS / 1000)}s with no output — treating as hung.`,
});
this.settle('failed', this.lastAssistantText || null);
}
}, Runner.IDLE_TIMEOUT_MS);
}
private clearIdle(): void {
if (this.idleTimer) {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
}
private fail(message: string): void { private fail(message: string): void {
if (this.settled) return; if (this.settled) return;
this.emit({ type: 'error', message }); this.emit({ type: 'error', message });
@@ -324,6 +367,7 @@ export class Runner {
if (this.settled) return; if (this.settled) return;
this.settled = true; this.settled = true;
this.clearWatchdog(); this.clearWatchdog();
this.clearIdle();
this.emit({ type: 'status', status }); this.emit({ type: 'status', status });
if (status === 'completed') this.emit({ type: 'done', summary: summary ?? '' }); if (status === 'completed') this.emit({ type: 'done', summary: summary ?? '' });
if (status === 'failed' && summary) this.emit({ type: 'text', text: summary }); if (status === 'failed' && summary) this.emit({ type: 'text', text: summary });

View File

@@ -139,8 +139,12 @@ class RunManager {
}); });
runner.start(); runner.start();
} catch (err) { } catch (err) {
// Persisting or starting failed — reclaim only a worktree we created. // Persisting or starting failed — reclaim only a worktree we created, and
// never leave a half-created row stuck at 'running'.
const message = err instanceof Error ? err.message : String(err);
if (worktree && ownsWorktree) removeWorktree(id, worktree.branch); if (worktree && ownsWorktree) removeWorktree(id, worktree.branch);
this.active.delete(id);
this.forceSettle(id, 'failed', `failed to start agent: ${message}`);
throw err; throw err;
} }
@@ -159,11 +163,21 @@ class RunManager {
entry.runner.message(text, mode); entry.runner.message(text, mode);
} }
/** Abort an active run. */ /** Abort an active run, or force-settle a wedged/orphaned one (recovery). */
stop(id: string): void { stop(id: string): void {
const entry = this.active.get(id); const entry = this.active.get(id);
if (!entry) throw new RunNotFoundError(id); if (entry) {
entry.runner.stop(); entry.runner.stop();
return;
}
// No live runner. If the DB still shows it running, force-settle so the
// operator can recover a wedged/orphaned run from the UI instead of it
// being stuck at 'running' with no process behind it.
const run = this.rawGet(id);
if (!run) throw new RunNotFoundError(id);
if (run.status === 'running') {
this.forceSettle(id, 'stopped', 'stopped by operator (no live agent process)');
}
} }
/** Subscribe to a run's live slim-event stream (best-effort; may be inactive). */ /** Subscribe to a run's live slim-event stream (best-effort; may be inactive). */
@@ -219,6 +233,12 @@ class RunManager {
// --- reads --------------------------------------------------------------- // --- reads ---------------------------------------------------------------
get(id: string): AgentRun | undefined { get(id: string): AgentRun | undefined {
return this.reconcile(this.rawGet(id));
}
/** Read a run WITHOUT orphan-reconciliation (used where we inspect status
* before deciding to force-settle, e.g. `stop()`). */
private rawGet(id: string): AgentRun | undefined {
const row = db.prepare('SELECT * FROM agent_runs WHERE id = ?').get(id) as AgentRunRow | undefined; const row = db.prepare('SELECT * FROM agent_runs WHERE id = ?').get(id) as AgentRunRow | undefined;
return row ? hydrateRun(row) : undefined; return row ? hydrateRun(row) : undefined;
} }
@@ -236,7 +256,7 @@ class RunManager {
const rows = db const rows = db
.prepare('SELECT * FROM agent_runs ORDER BY created_at DESC LIMIT ?') .prepare('SELECT * FROM agent_runs ORDER BY created_at DESC LIMIT ?')
.all(limit) as AgentRunRow[]; .all(limit) as AgentRunRow[];
return rows.map(hydrateRun); return rows.map((r) => this.reconcile(hydrateRun(r)) as AgentRun);
} }
/** Runs for a given card (newest first). */ /** Runs for a given card (newest first). */
@@ -244,7 +264,7 @@ class RunManager {
const rows = db const rows = db
.prepare('SELECT * FROM agent_runs WHERE card_id = ? ORDER BY created_at DESC') .prepare('SELECT * FROM agent_runs WHERE card_id = ? ORDER BY created_at DESC')
.all(cardId) as AgentRunRow[]; .all(cardId) as AgentRunRow[];
return rows.map(hydrateRun); return rows.map((r) => this.reconcile(hydrateRun(r)) as AgentRun);
} }
/** Replayable event history for a run, optionally after a sequence number. */ /** Replayable event history for a run, optionally after a sequence number. */
@@ -264,6 +284,47 @@ class RunManager {
// --- internal: finalize a run ------------------------------------------- // --- internal: finalize a run -------------------------------------------
/**
* Force-settle a run straight in the DB (no live runner). Used to recover
* runs stranded at 'running' — orphans with no live subprocess (crash,
* server restart mid-run, spawn failure) — and to honor an operator stop on a
* wedged run. Emits a persisted `status` event so any open stream reflects
* the new status immediately, and a `log` line with the reason.
*/
private forceSettle(id: string, status: RunStatus, reason: string): void {
const run = this.rawGet(id);
if (!run) return;
const now = new Date().toISOString();
// Best-effort: capture the head commit if the worktree is still around, so
// a recovered run can still be reviewed/diffed. Never wipe an existing one.
let commitSha: string | null = null;
if (run.worktreePath && isWorktreePresent(run.worktreePath)) {
commitSha = headSha(run.worktreePath);
}
appendRunEvent(id, 'status', { status });
appendRunEvent(id, 'log', { level: 'warn', text: reason });
db.prepare(
`UPDATE agent_runs
SET status = ?, error = ?, commit_sha = COALESCE(?, commit_sha), finished_at = ?
WHERE id = ?`,
).run(status, reason, commitSha, now, id);
this.active.delete(id);
}
/**
* Self-heal a run stranded at 'running' with no live runner in this process.
* Called on every read (get/list) so the board recovers automatically on the
* next poll — no manual action or server restart required. Returns the
* (possibly updated) run, or undefined if the run never existed.
*/
private reconcile(run: AgentRun | undefined): AgentRun | undefined {
if (run && run.status === 'running' && !this.active.has(run.id)) {
this.forceSettle(run.id, 'stopped', 'interrupted: no live agent process (server restarted or run crashed)');
return this.rawGet(run.id);
}
return run;
}
private settle(id: string, status: RunStatus, summary: string | null): void { private settle(id: string, status: RunStatus, summary: string | null): void {
const entry = this.active.get(id); const entry = this.active.get(id);
const now = new Date().toISOString(); const now = new Date().toISOString();

View File

@@ -87,7 +87,7 @@ orchestrator.post('/runs/:id/message', async (c) => {
} }
}); });
/** Stop an active run. */ /** Stop an active or wedged run (force-settles a run with no live process). */
orchestrator.post('/runs/:id/stop', (c) => { orchestrator.post('/runs/:id/stop', (c) => {
const id = c.req.param('id'); const id = c.req.param('id');
try { try {

View File

@@ -4,6 +4,7 @@ import {
type AgentRun, type AgentRun,
type DiffResult, type DiffResult,
type MergeResult, type MergeResult,
type RunStatus,
} from '../../lib/orchestratorApi'; } from '../../lib/orchestratorApi';
/** /**
@@ -64,6 +65,10 @@ export interface UseOrchestrator {
activeByCard: Map<string, { running: boolean; bevy: boolean; runId: string }>; activeByCard: Map<string, { running: boolean; bevy: boolean; runId: string }>;
} }
/** Terminal states a run never legitimately leaves once reached. Used by
* `patchRun` to reject stale history replays that would un-settle a run. */
const TERMINAL_STATUS: ReadonlySet<RunStatus> = new Set(['completed', 'failed', 'stopped']);
export function useOrchestrator(): UseOrchestrator { export function useOrchestrator(): UseOrchestrator {
const [runs, setRuns] = useState<AgentRun[]>([]); const [runs, setRuns] = useState<AgentRun[]>([]);
const [bevyRunning, setBevyRunning] = useState<Set<string>>(new Set()); const [bevyRunning, setBevyRunning] = useState<Set<string>>(new Set());
@@ -144,9 +149,25 @@ export function useOrchestrator(): UseOrchestrator {
setRuns((prev) => prev.filter((r) => r.id !== runId)); setRuns((prev) => prev.filter((r) => r.id !== runId));
}, []); }, []);
/** Apply a partial update to a run (status/summary/etc., from stream events). */ /**
* Apply a partial update to a run (status/summary/etc., from stream events).
* Guards against stale history replays un-settling a run: a terminal run can
* never legitimately return to `running`/`queued`, so such a patch (e.g. a
* reconnect re-emitting an old `running` status) is dropped.
*/
const patchRun = useCallback((runId: string, patch: Partial<AgentRun>) => { const patchRun = useCallback((runId: string, patch: Partial<AgentRun>) => {
setRuns((prev) => prev.map((r) => (r.id === runId ? { ...r, ...patch } : r))); setRuns((prev) => {
const cur = prev.find((r) => r.id === runId);
if (
cur &&
TERMINAL_STATUS.has(cur.status) &&
patch.status !== undefined &&
!TERMINAL_STATUS.has(patch.status)
) {
return prev; // ignore stale status re-assertion
}
return prev.map((r) => (r.id === runId ? { ...r, ...patch } : r));
});
}, []); }, []);
/** Reflect a Bevy playtest lifecycle change (from stream events). */ /** Reflect a Bevy playtest lifecycle change (from stream events). */