From 6531dc00dfe0a753f32ef30fb698d9cf29c540ac Mon Sep 17 00:00:00 2001 From: francy51 Date: Wed, 17 Jun 2026 18:34:05 -0400 Subject: [PATCH] feat(kanban): resume runs' chat via Refine + isolate the event stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two intertwined changes that both touch the orchestrator hook + run console: Isolate the agent event stream (perf): - useRunStream owns the SSE stream + event log locally inside AgentRunBar, so a burst of streamed events re-renders only the console — never the board page or card modal (which was causing frame drops at run start). - useOrchestrator is now a registry only; lifecycle events reflect back up via stable patchRun/reflectBevy reflectors (effect deps depend on those, not the whole object, avoiding a stream-teardown loop). Session resume for Refine: - Runs now persist their pi session (drop --no-session); each fresh run captures its session JSONL path into a new agent_runs.session_file column (additive, idempotent migration). - Refine resumes the prior run's actual session (--session → appends) in that run's own worktree (inherited, never owned), sending the operator's feedback as the next message in the same conversation with full prior context. - owns_worktree guards remove()/cleanup so a refinement never destroys the owning run's worktree; bad refinement targets return 409. - AgentRunBar shows Refine only for settled runs with a recorded session. EOF && echo "" && git log --oneline -3 --- apps/api/src/db.ts | 17 ++ apps/api/src/orchestrator/prompt.ts | 1 + apps/api/src/orchestrator/runner.ts | 40 ++- apps/api/src/orchestrator/runs.ts | 94 ++++++- apps/api/src/routes/orchestrator.ts | 7 +- apps/api/src/types.ts | 5 + .../src/components/kanban/AgentRunBar.tsx | 86 ++++++- .../src/components/kanban/useOrchestrator.ts | 243 ++++++------------ .../src/components/kanban/useRunStream.ts | 71 +++++ apps/docs/src/lib/orchestratorApi.ts | 9 + 10 files changed, 391 insertions(+), 182 deletions(-) create mode 100644 apps/docs/src/components/kanban/useRunStream.ts diff --git a/apps/api/src/db.ts b/apps/api/src/db.ts index 7e3b90a..cdb4dba 100644 --- a/apps/api/src/db.ts +++ b/apps/api/src/db.ts @@ -147,6 +147,18 @@ export function migrate(): void { CREATE INDEX IF NOT EXISTS idx_agent_runs_status ON agent_runs(status); CREATE INDEX IF NOT EXISTS idx_agent_events_run ON agent_run_events(run_id, seq); `); + + // Additive columns for agent_runs: session persistence (so refinement runs + // can resume a prior run's chat) and worktree ownership (refinement runs + // inherit, never own, a worktree). Idempotent: only adds what's missing. + const runCols = db.prepare('PRAGMA table_info(agent_runs)').all() as { name: string }[]; + const runColNames = new Set(runCols.map((c) => c.name)); + if (!runColNames.has('session_file')) { + db.exec('ALTER TABLE agent_runs ADD COLUMN session_file TEXT'); + } + if (!runColNames.has('owns_worktree')) { + db.exec('ALTER TABLE agent_runs ADD COLUMN owns_worktree INTEGER NOT NULL DEFAULT 1'); + } } /** Row shape for the agent_runs table. */ @@ -155,8 +167,13 @@ export interface AgentRunRow { card_id: string; status: string; use_worktree: number; + /** Whether this run created (and therefore owns/cleans up) its worktree. + * Refinement runs inherit a prior run's worktree and set this to 0. */ + owns_worktree: number; branch: string | null; worktree_path: string | null; + /** pi session JSONL path persisted for this run (resumable by refinements). */ + session_file: string | null; prompt: string; summary: string | null; token: string; diff --git a/apps/api/src/orchestrator/prompt.ts b/apps/api/src/orchestrator/prompt.ts index fac5824..98f56a9 100644 --- a/apps/api/src/orchestrator/prompt.ts +++ b/apps/api/src/orchestrator/prompt.ts @@ -153,3 +153,4 @@ ${describeCard(card)} Begin now. Start by reading the relevant docs and code, then implement. `; } + diff --git a/apps/api/src/orchestrator/runner.ts b/apps/api/src/orchestrator/runner.ts index d4cdd2b..e9b7204 100644 --- a/apps/api/src/orchestrator/runner.ts +++ b/apps/api/src/orchestrator/runner.ts @@ -41,15 +41,28 @@ export interface RunnerOptions { cwd: string; /** Initial prompt. */ prompt: string; + /** + * If set, resume (append to) this pi session JSONL file instead of starting a + * fresh conversation. Used by refinement runs to continue a prior run's chat. + */ + resumeSession?: string; /** Called once when the run settles (status is final). */ onSettled: (status: RunStatus, summary: string | null) => void; + /** + * Called with the persisted session JSONL path once pi reports it (fresh runs + * only — resumed runs already know their session file). Lets the manager + * record it so later refinements can resume this run's chat. + */ + onSessionResolved?: (sessionFile: string) => void; } export class Runner { readonly runId: string; private readonly cwd: string; private readonly prompt: string; + private readonly resumeSession: string | undefined; private readonly onSettled: (status: RunStatus, summary: string | null) => void; + private readonly onSessionResolved: ((sessionFile: string) => void) | undefined; private proc: ChildProcess | null = null; private listeners = new Set(); @@ -68,7 +81,9 @@ export class Runner { this.runId = opts.runId; this.cwd = opts.cwd; this.prompt = opts.prompt; + this.resumeSession = opts.resumeSession; this.onSettled = opts.onSettled; + this.onSessionResolved = opts.onSessionResolved; } /** Subscribe to live slim events. Returns an unsubscribe function. */ @@ -78,7 +93,10 @@ export class Runner { } /** Begin the run: spawn pi and send the prompt. */ start(): void { - const args = ['--mode', 'rpc', '--no-session', '--approve', '-n', `kanban:${this.runId.slice(0, 8)}`, '--tools', PI_TOOLS]; + const args = ['--mode', 'rpc', '--approve', '-n', `kanban:${this.runId.slice(0, 8)}`, '--tools', PI_TOOLS]; + // Refinement runs resume (append to) a prior run's session file; fresh runs + // persist a new session (no --no-session) so they can be refined later. + if (this.resumeSession) args.push('--session', this.resumeSession); if (PI_MODEL) args.push('--model', PI_MODEL); this.proc = spawn(PI_BIN, args, { @@ -101,8 +119,14 @@ export class Runner { this.proc.on('error', (err) => this.fail(`failed to spawn pi: ${err.message}`)); this.proc.on('exit', (code, signal) => this.onExit(code, signal)); + // Capture the persisted session file path up front (fresh runs) so the + // manager can record it for later refinement. Resumed runs already know + // their session. Sent before the prompt so it resolves immediately. + if (!this.resumeSession && this.onSessionResolved) { + this.send({ type: 'get_state', id: 'init-state' }); + } this.send({ type: 'prompt', message: this.prompt }); - this.emit({ type: 'log', level: 'info', text: `Started pi (cwd ${this.cwd})` }); + this.emit({ type: 'log', level: 'info', text: `Started pi (cwd ${this.cwd})${this.resumeSession ? ' [resumed session]' : ''}` }); this.emit({ type: 'status', status: 'running' }); } @@ -171,7 +195,17 @@ export class Runner { } switch (msg.type) { case 'response': - // Command ack; surface failures. + // Capture the persisted session file from the init get_state call so the + // manager can record it for later refinement runs. + if ( + msg.id === 'init-state' && + msg.success === true && + this.onSessionResolved + ) { + const sf = (msg.data as { sessionFile?: string } | undefined)?.sessionFile; + if (typeof sf === 'string' && sf) this.onSessionResolved(sf); + } + // Surface command failures. if (msg.success === false) { this.emit({ type: 'log', level: 'error', text: `command ${String(msg.command ?? '?')} rejected` }); } diff --git a/apps/api/src/orchestrator/runs.ts b/apps/api/src/orchestrator/runs.ts index 07ce781..060e5fc 100644 --- a/apps/api/src/orchestrator/runs.ts +++ b/apps/api/src/orchestrator/runs.ts @@ -25,6 +25,13 @@ export interface StartRunInput { useWorktree?: boolean; /** If true, delete the worktree + branch when the run settles. */ cleanupOnFinish?: boolean; + /** + * If set, start a refinement run: a fresh worktree branched from this prior + * run's branch (so its commits/work are present), seeded with `prompt` as the + * operator's refinement feedback. Only valid when the prior run is settled + * and its worktree is still present. + */ + refineRunId?: string; } class RunManager { @@ -40,37 +47,100 @@ class RunManager { const useWorktree = input.useWorktree ?? true; const now = new Date().toISOString(); - // Provision the worktree (or fall back to the repo root). + // A refinement appends to a prior run's chat: it resumes that run's pi + // session in the SAME worktree (so the working tree matches the + // conversation) and is itself NOT the worktree's owner. Resolve it first so + // any error surfaces before we create anything. + let refineOf: { worktree: Worktree; sessionFile: string } | null = null; + if (input.refineRunId) { + if (!useWorktree) throw new Error('a refinement run requires a worktree'); + const prior = this.get(input.refineRunId); + if (!prior) throw new Error(`refinement source run not found: ${input.refineRunId}`); + if (prior.cardId !== card.id) throw new Error('refinement source run belongs to a different card'); + if ( + !prior.ownsWorktree || + !prior.worktreePath || + !prior.branch || + !isWorktreePresent(prior.worktreePath) + ) { + throw new Error('refinement source run has no worktree (it was cleaned up); start a fresh run instead'); + } + if (!prior.sessionFile) { + throw new Error('refinement source run has no recorded session to resume'); + } + refineOf = { + worktree: { path: prior.worktreePath, branch: prior.branch }, + sessionFile: prior.sessionFile, + }; + } + + // Provision the worktree. Refinement runs inherit the prior run's worktree + // (and never own it); fresh runs create their own (or fall back to repo root). let worktree: Worktree | null = null; let branch: string | null = null; let worktreePath: string | null = null; - if (useWorktree) { + if (refineOf) { + worktree = refineOf.worktree; + branch = refineOf.worktree.branch; + worktreePath = refineOf.worktree.path; + } else if (useWorktree) { worktree = createWorktree(id, card.id); branch = worktree.branch; worktreePath = worktree.path; } + const ownsWorktree = !refineOf; - const prompt = buildPrompt(card, { token, runId: id }, input.prompt); + // Fresh runs get the full agent contract prompt; refinement runs just send + // the operator's feedback as a new user turn in the resumed session (the + // prior system prompt + conversation are already in the session history). + const prompt = refineOf + ? input.prompt?.trim() || '(no specific changes requested — review the work so far and improve it).' + : buildPrompt(card, { token, runId: id }, input.prompt); try { db.prepare( `INSERT INTO agent_runs - (id, card_id, status, use_worktree, branch, worktree_path, prompt, token, created_at, started_at) - VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?, ?)`, - ).run(id, card.id, useWorktree ? 1 : 0, branch, worktreePath, prompt, token, now, now); + (id, card_id, status, use_worktree, owns_worktree, branch, worktree_path, session_file, prompt, token, created_at, started_at) + VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ).run( + id, + card.id, + useWorktree ? 1 : 0, + ownsWorktree ? 1 : 0, + branch, + worktreePath, + refineOf?.sessionFile ?? null, + prompt, + token, + now, + now, + ); const cwd = worktreePath ?? REPO_ROOT; const runner = new Runner({ runId: id, cwd, prompt, + resumeSession: refineOf?.sessionFile, onSettled: (status, summary) => this.settle(id, status, summary), + // Only fresh runs need to discover + persist their session file; resumed + // runs already point at the prior session via session_file above. + onSessionResolved: refineOf + ? undefined + : (sessionFile) => { + db.prepare('UPDATE agent_runs SET session_file = ? WHERE id = ?').run(sessionFile, id); + }, + }); + // A refinement run never cleans up the worktree it inherited. + this.active.set(id, { + runner, + worktree, + cleanup: ownsWorktree && (input.cleanupOnFinish ?? false), }); - this.active.set(id, { runner, worktree, cleanup: input.cleanupOnFinish ?? false }); runner.start(); } catch (err) { - // Persisting or starting failed — reclaim the worktree so we never leak one. - if (worktree) removeWorktree(id, worktree.branch); + // Persisting or starting failed — reclaim only a worktree we created. + if (worktree && ownsWorktree) removeWorktree(id, worktree.branch); throw err; } @@ -111,7 +181,9 @@ class RunManager { this.bevy.get(id)?.stop(); this.bevy.delete(id); const row = this.get(id); - if (row?.worktreePath && row.branch) removeWorktree(id, row.branch); + // Only the run that created a worktree may reclaim it; refinement runs + // inherit theirs and must never delete the owning run's worktree. + if (row?.ownsWorktree && row.worktreePath && row.branch) removeWorktree(id, row.branch); db.prepare('DELETE FROM agent_run_events WHERE run_id = ?').run(id); db.prepare('DELETE FROM agent_runs WHERE id = ?').run(id); } @@ -237,8 +309,10 @@ function hydrateRun(row: AgentRunRow): AgentRun { return { cardId: row.card_id, status: row.status as RunStatus, useWorktree: row.use_worktree === 1, + ownsWorktree: row.owns_worktree === 1, branch: row.branch, worktreePath: row.worktree_path, + sessionFile: row.session_file, prompt: row.prompt, summary: row.summary, commitSha: row.commit_sha, diff --git a/apps/api/src/routes/orchestrator.ts b/apps/api/src/routes/orchestrator.ts index 3cd62dc..4b4a929 100644 --- a/apps/api/src/routes/orchestrator.ts +++ b/apps/api/src/routes/orchestrator.ts @@ -58,13 +58,16 @@ orchestrator.post('/runs', async (c) => { const prompt = typeof body.prompt === 'string' ? body.prompt : undefined; const useWorktree = body.useWorktree !== false; // default true const cleanupOnFinish = body.cleanupOnFinish === true; + const refineRunId = typeof body.refineRunId === 'string' ? body.refineRunId : undefined; try { - const run = runManager.start(card, { cardId, prompt, useWorktree, cleanupOnFinish }); + const run = runManager.start(card, { cardId, prompt, useWorktree, cleanupOnFinish, refineRunId }); return c.json({ run }, 201); } catch (err) { const message = err instanceof Error ? err.message : 'failed to start run'; - return c.json({ error: message }, 500); + // A bad refinement target (missing run / cleaned-up worktree) is a client error. + const status = message.startsWith('refinement') ? 409 : 500; + return c.json({ error: message }, status); } }); diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index c88620d..2396cd1 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -90,8 +90,13 @@ export interface AgentRun { cardId: string; status: RunStatus; useWorktree: boolean; + /** Whether this run created (and therefore owns/cleans up) its worktree. + * Refinement runs inherit a prior run's worktree and set this to false. */ + ownsWorktree: boolean; branch: string | null; worktreePath: string | null; + /** pi session JSONL path persisted for this run (resumable by refinements). */ + sessionFile: string | null; prompt: string; summary: string | null; commitSha: string | null; diff --git a/apps/docs/src/components/kanban/AgentRunBar.tsx b/apps/docs/src/components/kanban/AgentRunBar.tsx index 6128c4d..bcdea96 100644 --- a/apps/docs/src/components/kanban/AgentRunBar.tsx +++ b/apps/docs/src/components/kanban/AgentRunBar.tsx @@ -3,6 +3,7 @@ import type { Card } from '../../lib/kanbanApi'; import type { UseOrchestrator } from './useOrchestrator'; import { DiffModal } from './DiffModal'; import { RunEventList } from './RunEventList'; +import { useRunStream } from './useRunStream'; /** * Inline agentic console for one kanban card. @@ -41,6 +42,8 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) { const [promptDraft, setPromptDraft] = useState(''); const [steerDraft, setSteerDraft] = useState(''); const [showPrompt, setShowPrompt] = useState(false); + const [refineDraft, setRefineDraft] = useState(''); + const [showRefine, setShowRefine] = useState(false); const [showDiff, setShowDiff] = useState(false); const [mergeResult, setMergeResult] = useState<{ ok: boolean; text: string } | null>(null); const [mergeBusy, setMergeBusy] = useState(false); @@ -50,13 +53,8 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) { const bevyRunning = run ? orch.bevyIsRunning(run.id) : false; - // Stream the run's events while it exists (agent run OR a later Bevy test); - // the card stays expanded so playtest output keeps flowing after settle. - useEffect(() => { - if (!run) return; - orch.watch(run.id); - return () => orch.unwatch(run.id); - }, [run, orch]); + // (The live event stream is owned locally by `useRunStream` below, so a burst + // of agent events re-renders only this console — never the board or modal.) // Sync Bevy status from the server when the run changes (truth after a // reconnect/refresh; live lifecycle is mirrored via `bevy` events). @@ -64,7 +62,7 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) { if (run) void orch.refreshBevyStatus(run.id); }, [run, orch]); - const events = run ? orch.eventsForRun(run.id) : []; + const events = useRunStream(run?.id ?? null, orch); useEffect(() => { if (logRef.current) logRef.current.scrollTop = logRef.current.scrollHeight; }, [events.length]); @@ -93,6 +91,29 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) { } }; + /** + * Start a refinement run: continues this run's branch (prior commits present) + * seeded with the operator's feedback. The new run replaces this one in the + * console since `runForCard` returns the active run. + */ + const refine = async () => { + if (!run) return; + setBusy(true); + try { + await orch.start({ + cardId: card.id, + prompt: refineDraft.trim() || undefined, + refineRunId: run.id, + }); + setRefineDraft(''); + setShowRefine(false); + } catch (e) { + alert(e instanceof Error ? e.message : 'Failed to start refinement'); + } finally { + setBusy(false); + } + }; + const steer = async () => { if (!run || !steerDraft.trim()) return; setBusy(true); @@ -173,9 +194,29 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) { )} {settled && ( <> - + {hasWorktree && run.sessionFile && ( + + )} @@ -219,6 +260,33 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) { )} + {/* Refinement: resume this run's conversation with operator feedback */} + {showRefine && settled && hasWorktree && run.sessionFile && ( +
+