feat(kanban): resume runs' chat via Refine + isolate the event stream

Two intertwined changes that both touch the orchestrator hook + run console:

Isolate the agent event stream (perf):
- useRunStream owns the SSE stream + event log locally inside AgentRunBar, so a
  burst of streamed events re-renders only the console — never the board page or
  card modal (which was causing frame drops at run start).
- useOrchestrator is now a registry only; lifecycle events reflect back up via
  stable patchRun/reflectBevy reflectors (effect deps depend on those, not the
  whole object, avoiding a stream-teardown loop).

Session resume for Refine:
- Runs now persist their pi session (drop --no-session); each fresh run captures
  its session JSONL path into a new agent_runs.session_file column (additive,
  idempotent migration).
- Refine resumes the prior run's actual session (--session <path> → appends) in
  that run's own worktree (inherited, never owned), sending the operator's
  feedback as the next message in the same conversation with full prior context.
- owns_worktree guards remove()/cleanup so a refinement never destroys the
  owning run's worktree; bad refinement targets return 409.
- AgentRunBar shows Refine only for settled runs with a recorded session.
EOF && echo "" && git log --oneline -3
This commit is contained in:
2026-06-17 18:34:05 -04:00
parent 407bc4f790
commit 6531dc00df
10 changed files with 391 additions and 182 deletions

View File

@@ -147,6 +147,18 @@ export function migrate(): void {
CREATE INDEX IF NOT EXISTS idx_agent_runs_status ON agent_runs(status); CREATE INDEX IF NOT EXISTS idx_agent_runs_status ON agent_runs(status);
CREATE INDEX IF NOT EXISTS idx_agent_events_run ON agent_run_events(run_id, seq); CREATE INDEX IF NOT EXISTS idx_agent_events_run ON agent_run_events(run_id, seq);
`); `);
// Additive columns for agent_runs: session persistence (so refinement runs
// can resume a prior run's chat) and worktree ownership (refinement runs
// inherit, never own, a worktree). Idempotent: only adds what's missing.
const runCols = db.prepare('PRAGMA table_info(agent_runs)').all() as { name: string }[];
const runColNames = new Set(runCols.map((c) => c.name));
if (!runColNames.has('session_file')) {
db.exec('ALTER TABLE agent_runs ADD COLUMN session_file TEXT');
}
if (!runColNames.has('owns_worktree')) {
db.exec('ALTER TABLE agent_runs ADD COLUMN owns_worktree INTEGER NOT NULL DEFAULT 1');
}
} }
/** Row shape for the agent_runs table. */ /** Row shape for the agent_runs table. */
@@ -155,8 +167,13 @@ export interface AgentRunRow {
card_id: string; card_id: string;
status: string; status: string;
use_worktree: number; use_worktree: number;
/** Whether this run created (and therefore owns/cleans up) its worktree.
* Refinement runs inherit a prior run's worktree and set this to 0. */
owns_worktree: number;
branch: string | null; branch: string | null;
worktree_path: string | null; worktree_path: string | null;
/** pi session JSONL path persisted for this run (resumable by refinements). */
session_file: string | null;
prompt: string; prompt: string;
summary: string | null; summary: string | null;
token: string; token: string;

View File

@@ -153,3 +153,4 @@ ${describeCard(card)}
Begin now. Start by reading the relevant docs and code, then implement. Begin now. Start by reading the relevant docs and code, then implement.
`; `;
} }

View File

@@ -41,15 +41,28 @@ export interface RunnerOptions {
cwd: string; cwd: string;
/** Initial prompt. */ /** Initial prompt. */
prompt: string; prompt: string;
/**
* If set, resume (append to) this pi session JSONL file instead of starting a
* fresh conversation. Used by refinement runs to continue a prior run's chat.
*/
resumeSession?: string;
/** Called once when the run settles (status is final). */ /** Called once when the run settles (status is final). */
onSettled: (status: RunStatus, summary: string | null) => void; onSettled: (status: RunStatus, summary: string | null) => void;
/**
* Called with the persisted session JSONL path once pi reports it (fresh runs
* only — resumed runs already know their session file). Lets the manager
* record it so later refinements can resume this run's chat.
*/
onSessionResolved?: (sessionFile: string) => void;
} }
export class Runner { export class Runner {
readonly runId: string; readonly runId: string;
private readonly cwd: string; private readonly cwd: string;
private readonly prompt: string; private readonly prompt: string;
private readonly resumeSession: string | undefined;
private readonly onSettled: (status: RunStatus, summary: string | null) => void; private readonly onSettled: (status: RunStatus, summary: string | null) => void;
private readonly onSessionResolved: ((sessionFile: string) => void) | undefined;
private proc: ChildProcess | null = null; private proc: ChildProcess | null = null;
private listeners = new Set<Listener>(); private listeners = new Set<Listener>();
@@ -68,7 +81,9 @@ export class Runner {
this.runId = opts.runId; this.runId = opts.runId;
this.cwd = opts.cwd; this.cwd = opts.cwd;
this.prompt = opts.prompt; this.prompt = opts.prompt;
this.resumeSession = opts.resumeSession;
this.onSettled = opts.onSettled; this.onSettled = opts.onSettled;
this.onSessionResolved = opts.onSessionResolved;
} }
/** Subscribe to live slim events. Returns an unsubscribe function. */ /** Subscribe to live slim events. Returns an unsubscribe function. */
@@ -78,7 +93,10 @@ export class Runner {
} }
/** Begin the run: spawn pi and send the prompt. */ /** Begin the run: spawn pi and send the prompt. */
start(): void { start(): void {
const args = ['--mode', 'rpc', '--no-session', '--approve', '-n', `kanban:${this.runId.slice(0, 8)}`, '--tools', PI_TOOLS]; const args = ['--mode', 'rpc', '--approve', '-n', `kanban:${this.runId.slice(0, 8)}`, '--tools', PI_TOOLS];
// Refinement runs resume (append to) a prior run's session file; fresh runs
// persist a new session (no --no-session) so they can be refined later.
if (this.resumeSession) args.push('--session', this.resumeSession);
if (PI_MODEL) args.push('--model', PI_MODEL); if (PI_MODEL) args.push('--model', PI_MODEL);
this.proc = spawn(PI_BIN, args, { this.proc = spawn(PI_BIN, args, {
@@ -101,8 +119,14 @@ export class Runner {
this.proc.on('error', (err) => this.fail(`failed to spawn pi: ${err.message}`)); this.proc.on('error', (err) => this.fail(`failed to spawn pi: ${err.message}`));
this.proc.on('exit', (code, signal) => this.onExit(code, signal)); this.proc.on('exit', (code, signal) => this.onExit(code, signal));
// Capture the persisted session file path up front (fresh runs) so the
// manager can record it for later refinement. Resumed runs already know
// their session. Sent before the prompt so it resolves immediately.
if (!this.resumeSession && this.onSessionResolved) {
this.send({ type: 'get_state', id: 'init-state' });
}
this.send({ type: 'prompt', message: this.prompt }); this.send({ type: 'prompt', message: this.prompt });
this.emit({ type: 'log', level: 'info', text: `Started pi (cwd ${this.cwd})` }); this.emit({ type: 'log', level: 'info', text: `Started pi (cwd ${this.cwd})${this.resumeSession ? ' [resumed session]' : ''}` });
this.emit({ type: 'status', status: 'running' }); this.emit({ type: 'status', status: 'running' });
} }
@@ -171,7 +195,17 @@ export class Runner {
} }
switch (msg.type) { switch (msg.type) {
case 'response': case 'response':
// Command ack; surface failures. // Capture the persisted session file from the init get_state call so the
// manager can record it for later refinement runs.
if (
msg.id === 'init-state' &&
msg.success === true &&
this.onSessionResolved
) {
const sf = (msg.data as { sessionFile?: string } | undefined)?.sessionFile;
if (typeof sf === 'string' && sf) this.onSessionResolved(sf);
}
// Surface command failures.
if (msg.success === false) { if (msg.success === false) {
this.emit({ type: 'log', level: 'error', text: `command ${String(msg.command ?? '?')} rejected` }); this.emit({ type: 'log', level: 'error', text: `command ${String(msg.command ?? '?')} rejected` });
} }

View File

@@ -25,6 +25,13 @@ export interface StartRunInput {
useWorktree?: boolean; useWorktree?: boolean;
/** If true, delete the worktree + branch when the run settles. */ /** If true, delete the worktree + branch when the run settles. */
cleanupOnFinish?: boolean; cleanupOnFinish?: boolean;
/**
* If set, start a refinement run: a fresh worktree branched from this prior
* run's branch (so its commits/work are present), seeded with `prompt` as the
* operator's refinement feedback. Only valid when the prior run is settled
* and its worktree is still present.
*/
refineRunId?: string;
} }
class RunManager { class RunManager {
@@ -40,37 +47,100 @@ class RunManager {
const useWorktree = input.useWorktree ?? true; const useWorktree = input.useWorktree ?? true;
const now = new Date().toISOString(); const now = new Date().toISOString();
// Provision the worktree (or fall back to the repo root). // A refinement appends to a prior run's chat: it resumes that run's pi
// session in the SAME worktree (so the working tree matches the
// conversation) and is itself NOT the worktree's owner. Resolve it first so
// any error surfaces before we create anything.
let refineOf: { worktree: Worktree; sessionFile: string } | null = null;
if (input.refineRunId) {
if (!useWorktree) throw new Error('a refinement run requires a worktree');
const prior = this.get(input.refineRunId);
if (!prior) throw new Error(`refinement source run not found: ${input.refineRunId}`);
if (prior.cardId !== card.id) throw new Error('refinement source run belongs to a different card');
if (
!prior.ownsWorktree ||
!prior.worktreePath ||
!prior.branch ||
!isWorktreePresent(prior.worktreePath)
) {
throw new Error('refinement source run has no worktree (it was cleaned up); start a fresh run instead');
}
if (!prior.sessionFile) {
throw new Error('refinement source run has no recorded session to resume');
}
refineOf = {
worktree: { path: prior.worktreePath, branch: prior.branch },
sessionFile: prior.sessionFile,
};
}
// Provision the worktree. Refinement runs inherit the prior run's worktree
// (and never own it); fresh runs create their own (or fall back to repo root).
let worktree: Worktree | null = null; let worktree: Worktree | null = null;
let branch: string | null = null; let branch: string | null = null;
let worktreePath: string | null = null; let worktreePath: string | null = null;
if (useWorktree) { if (refineOf) {
worktree = refineOf.worktree;
branch = refineOf.worktree.branch;
worktreePath = refineOf.worktree.path;
} else if (useWorktree) {
worktree = createWorktree(id, card.id); worktree = createWorktree(id, card.id);
branch = worktree.branch; branch = worktree.branch;
worktreePath = worktree.path; worktreePath = worktree.path;
} }
const ownsWorktree = !refineOf;
const prompt = buildPrompt(card, { token, runId: id }, input.prompt); // Fresh runs get the full agent contract prompt; refinement runs just send
// the operator's feedback as a new user turn in the resumed session (the
// prior system prompt + conversation are already in the session history).
const prompt = refineOf
? input.prompt?.trim() || '(no specific changes requested — review the work so far and improve it).'
: buildPrompt(card, { token, runId: id }, input.prompt);
try { try {
db.prepare( db.prepare(
`INSERT INTO agent_runs `INSERT INTO agent_runs
(id, card_id, status, use_worktree, branch, worktree_path, prompt, token, created_at, started_at) (id, card_id, status, use_worktree, owns_worktree, branch, worktree_path, session_file, prompt, token, created_at, started_at)
VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?, ?)`, VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
).run(id, card.id, useWorktree ? 1 : 0, branch, worktreePath, prompt, token, now, now); ).run(
id,
card.id,
useWorktree ? 1 : 0,
ownsWorktree ? 1 : 0,
branch,
worktreePath,
refineOf?.sessionFile ?? null,
prompt,
token,
now,
now,
);
const cwd = worktreePath ?? REPO_ROOT; const cwd = worktreePath ?? REPO_ROOT;
const runner = new Runner({ const runner = new Runner({
runId: id, runId: id,
cwd, cwd,
prompt, prompt,
resumeSession: refineOf?.sessionFile,
onSettled: (status, summary) => this.settle(id, status, summary), onSettled: (status, summary) => this.settle(id, status, summary),
// Only fresh runs need to discover + persist their session file; resumed
// runs already point at the prior session via session_file above.
onSessionResolved: refineOf
? undefined
: (sessionFile) => {
db.prepare('UPDATE agent_runs SET session_file = ? WHERE id = ?').run(sessionFile, id);
},
});
// A refinement run never cleans up the worktree it inherited.
this.active.set(id, {
runner,
worktree,
cleanup: ownsWorktree && (input.cleanupOnFinish ?? false),
}); });
this.active.set(id, { runner, worktree, cleanup: input.cleanupOnFinish ?? false });
runner.start(); runner.start();
} catch (err) { } catch (err) {
// Persisting or starting failed — reclaim the worktree so we never leak one. // Persisting or starting failed — reclaim only a worktree we created.
if (worktree) removeWorktree(id, worktree.branch); if (worktree && ownsWorktree) removeWorktree(id, worktree.branch);
throw err; throw err;
} }
@@ -111,7 +181,9 @@ class RunManager {
this.bevy.get(id)?.stop(); this.bevy.get(id)?.stop();
this.bevy.delete(id); this.bevy.delete(id);
const row = this.get(id); const row = this.get(id);
if (row?.worktreePath && row.branch) removeWorktree(id, row.branch); // Only the run that created a worktree may reclaim it; refinement runs
// inherit theirs and must never delete the owning run's worktree.
if (row?.ownsWorktree && row.worktreePath && row.branch) removeWorktree(id, row.branch);
db.prepare('DELETE FROM agent_run_events WHERE run_id = ?').run(id); db.prepare('DELETE FROM agent_run_events WHERE run_id = ?').run(id);
db.prepare('DELETE FROM agent_runs WHERE id = ?').run(id); db.prepare('DELETE FROM agent_runs WHERE id = ?').run(id);
} }
@@ -237,8 +309,10 @@ function hydrateRun(row: AgentRunRow): AgentRun { return {
cardId: row.card_id, cardId: row.card_id,
status: row.status as RunStatus, status: row.status as RunStatus,
useWorktree: row.use_worktree === 1, useWorktree: row.use_worktree === 1,
ownsWorktree: row.owns_worktree === 1,
branch: row.branch, branch: row.branch,
worktreePath: row.worktree_path, worktreePath: row.worktree_path,
sessionFile: row.session_file,
prompt: row.prompt, prompt: row.prompt,
summary: row.summary, summary: row.summary,
commitSha: row.commit_sha, commitSha: row.commit_sha,

View File

@@ -58,13 +58,16 @@ orchestrator.post('/runs', async (c) => {
const prompt = typeof body.prompt === 'string' ? body.prompt : undefined; const prompt = typeof body.prompt === 'string' ? body.prompt : undefined;
const useWorktree = body.useWorktree !== false; // default true const useWorktree = body.useWorktree !== false; // default true
const cleanupOnFinish = body.cleanupOnFinish === true; const cleanupOnFinish = body.cleanupOnFinish === true;
const refineRunId = typeof body.refineRunId === 'string' ? body.refineRunId : undefined;
try { try {
const run = runManager.start(card, { cardId, prompt, useWorktree, cleanupOnFinish }); const run = runManager.start(card, { cardId, prompt, useWorktree, cleanupOnFinish, refineRunId });
return c.json({ run }, 201); return c.json({ run }, 201);
} catch (err) { } catch (err) {
const message = err instanceof Error ? err.message : 'failed to start run'; const message = err instanceof Error ? err.message : 'failed to start run';
return c.json({ error: message }, 500); // A bad refinement target (missing run / cleaned-up worktree) is a client error.
const status = message.startsWith('refinement') ? 409 : 500;
return c.json({ error: message }, status);
} }
}); });

View File

@@ -90,8 +90,13 @@ export interface AgentRun {
cardId: string; cardId: string;
status: RunStatus; status: RunStatus;
useWorktree: boolean; useWorktree: boolean;
/** Whether this run created (and therefore owns/cleans up) its worktree.
* Refinement runs inherit a prior run's worktree and set this to false. */
ownsWorktree: boolean;
branch: string | null; branch: string | null;
worktreePath: string | null; worktreePath: string | null;
/** pi session JSONL path persisted for this run (resumable by refinements). */
sessionFile: string | null;
prompt: string; prompt: string;
summary: string | null; summary: string | null;
commitSha: string | null; commitSha: string | null;

View File

@@ -3,6 +3,7 @@ import type { Card } from '../../lib/kanbanApi';
import type { UseOrchestrator } from './useOrchestrator'; import type { UseOrchestrator } from './useOrchestrator';
import { DiffModal } from './DiffModal'; import { DiffModal } from './DiffModal';
import { RunEventList } from './RunEventList'; import { RunEventList } from './RunEventList';
import { useRunStream } from './useRunStream';
/** /**
* Inline agentic console for one kanban card. * Inline agentic console for one kanban card.
@@ -41,6 +42,8 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) {
const [promptDraft, setPromptDraft] = useState(''); const [promptDraft, setPromptDraft] = useState('');
const [steerDraft, setSteerDraft] = useState(''); const [steerDraft, setSteerDraft] = useState('');
const [showPrompt, setShowPrompt] = useState(false); const [showPrompt, setShowPrompt] = useState(false);
const [refineDraft, setRefineDraft] = useState('');
const [showRefine, setShowRefine] = useState(false);
const [showDiff, setShowDiff] = useState(false); const [showDiff, setShowDiff] = useState(false);
const [mergeResult, setMergeResult] = useState<{ ok: boolean; text: string } | null>(null); const [mergeResult, setMergeResult] = useState<{ ok: boolean; text: string } | null>(null);
const [mergeBusy, setMergeBusy] = useState(false); const [mergeBusy, setMergeBusy] = useState(false);
@@ -50,13 +53,8 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) {
const bevyRunning = run ? orch.bevyIsRunning(run.id) : false; const bevyRunning = run ? orch.bevyIsRunning(run.id) : false;
// Stream the run's events while it exists (agent run OR a later Bevy test); // (The live event stream is owned locally by `useRunStream` below, so a burst
// the card stays expanded so playtest output keeps flowing after settle. // of agent events re-renders only this console — never the board or modal.)
useEffect(() => {
if (!run) return;
orch.watch(run.id);
return () => orch.unwatch(run.id);
}, [run, orch]);
// Sync Bevy status from the server when the run changes (truth after a // Sync Bevy status from the server when the run changes (truth after a
// reconnect/refresh; live lifecycle is mirrored via `bevy` events). // reconnect/refresh; live lifecycle is mirrored via `bevy` events).
@@ -64,7 +62,7 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) {
if (run) void orch.refreshBevyStatus(run.id); if (run) void orch.refreshBevyStatus(run.id);
}, [run, orch]); }, [run, orch]);
const events = run ? orch.eventsForRun(run.id) : []; const events = useRunStream(run?.id ?? null, orch);
useEffect(() => { useEffect(() => {
if (logRef.current) logRef.current.scrollTop = logRef.current.scrollHeight; if (logRef.current) logRef.current.scrollTop = logRef.current.scrollHeight;
}, [events.length]); }, [events.length]);
@@ -93,6 +91,29 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) {
} }
}; };
/**
* Start a refinement run: continues this run's branch (prior commits present)
* seeded with the operator's feedback. The new run replaces this one in the
* console since `runForCard` returns the active run.
*/
const refine = async () => {
if (!run) return;
setBusy(true);
try {
await orch.start({
cardId: card.id,
prompt: refineDraft.trim() || undefined,
refineRunId: run.id,
});
setRefineDraft('');
setShowRefine(false);
} catch (e) {
alert(e instanceof Error ? e.message : 'Failed to start refinement');
} finally {
setBusy(false);
}
};
const steer = async () => { const steer = async () => {
if (!run || !steerDraft.trim()) return; if (!run || !steerDraft.trim()) return;
setBusy(true); setBusy(true);
@@ -173,9 +194,29 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) {
)} )}
{settled && ( {settled && (
<> <>
<button type="button" style={ghostBtn} onClick={() => setShowPrompt((s) => !s)}> <button
type="button"
style={ghostBtn}
onClick={() => {
setShowPrompt((s) => !s);
setShowRefine(false);
}}
>
Re-run Re-run
</button> </button>
{hasWorktree && run.sessionFile && (
<button
type="button"
style={accentBtn}
onClick={() => {
setShowRefine((s) => !s);
setShowPrompt(false);
}}
title="Continue this run's conversation with feedback"
>
Refine
</button>
)}
<button type="button" style={ghostBtn} onClick={dismiss} title="Remove this run"> <button type="button" style={ghostBtn} onClick={dismiss} title="Remove this run">
Dismiss Dismiss
</button> </button>
@@ -219,6 +260,33 @@ export function AgentRunBar({ card, orch }: AgentRunBarProps) {
</div> </div>
)} )}
{/* Refinement: resume this run's conversation with operator feedback */}
{showRefine && settled && hasWorktree && run.sessionFile && (
<div style={blockStyle}>
<textarea
placeholder="Ask for changes — this continues the same conversation (e.g. 'the jump feels too floaty', 'also handle the empty-list case', 'split the large function')…"
value={refineDraft}
onChange={(e) => setRefineDraft(e.target.value)}
onClick={(e) => e.stopPropagation()}
style={{ ...inputStyle, width: '100%', minHeight: '60px', resize: 'vertical' }}
spellCheck={false}
autoFocus
/>
<div style={{ marginTop: '6px', display: 'flex', gap: '6px', alignItems: 'center' }}>
<button type="button" style={primaryBtn} onClick={refine} disabled={busy}>
{busy ? 'Starting…' : '✎ Refine'}
</button>
<button type="button" style={ghostBtn} onClick={() => setShowRefine(false)}>
Cancel
</button>
<span style={hintStyle}>
Resumes this run's chat in the same worktree — the agent keeps the
full prior context and appends your request as the next message.
</span>
</div>
</div>
)}
{/* Steer box while running */} {/* Steer box while running */}
{isActive && ( {isActive && (
<div style={{ ...blockStyle, display: 'flex', gap: '6px' }}> <div style={{ ...blockStyle, display: 'flex', gap: '6px' }}>

View File

@@ -4,16 +4,21 @@ import {
type AgentRun, type AgentRun,
type DiffResult, type DiffResult,
type MergeResult, type MergeResult,
type RunEvent,
} from '../../lib/orchestratorApi'; } from '../../lib/orchestratorApi';
/** /**
* Drives the agentic orchestrator from the board UI. * Shared run registry for the implementation board.
* *
* Holds the latest run per card plus a live, replayable event log for any run * Owns the lightweight, board-level slice of orchestrator state: the run list,
* currently being "watched" (typically the expanded card's active run). Live * Bevy-playtest flags, and the derived active-run index. It deliberately does
* updates arrive over Server-Sent Events; the board stays fully interactive * NOT hold the streaming event log — that lives in `useRunStream`, scoped to the
* while an agent works a card — start, steer, and stop at any time. * run console (`AgentRunBar`), so a burst of agent events re-renders only the
* console and never the board page or card modal. Lifecycle changes observed in
* a stream (status / bevy / done) are pushed back here via `patchRun` /
* `reflectBevy` so the board's active indicators stay correct.
*
* The returned object is memoized on its state/callbacks, so its identity is
* stable between registry lifecycle changes (not, e.g., on every render).
*/ */
export interface UseOrchestrator { export interface UseOrchestrator {
@@ -23,24 +28,22 @@ export interface UseOrchestrator {
isRunning: (cardId: string) => boolean; isRunning: (cardId: string) => boolean;
/** All known runs (newest first), for a global activity view. */ /** All known runs (newest first), for a global activity view. */
runs: AgentRun[]; runs: AgentRun[];
/** Watched events keyed by run id (ordered). */
eventsForRun: (runId: string) => RunEvent[];
/** Load initial state. */ /** Load initial state. */
reload: () => Promise<void>; reload: () => Promise<void>;
loading: boolean; loading: boolean;
error: string | null; error: string | null;
/** Begin a run for a card. Returns the created run. */ /** Begin a run for a card. Returns the created run. */
start: (input: { cardId: string; prompt?: string }) => Promise<AgentRun>; start: (input: { cardId: string; prompt?: string; refineRunId?: string }) => Promise<AgentRun>;
/** Send a steer/follow-up message to an active run. */ /** Send a steer/follow-up message to an active run. */
message: (runId: string, text: string, mode: 'steer' | 'followUp') => Promise<void>; message: (runId: string, text: string, mode: 'steer' | 'followUp') => Promise<void>;
/** Stop an active run. */ /** Stop an active run. */
stop: (runId: string) => Promise<void>; stop: (runId: string) => Promise<void>;
/** Open the live event stream for a run (ref-counted; safe to call repeatedly). */
watch: (runId: string) => void;
/** Release a watch (ref-counted; closes the stream when the last watcher leaves). */
unwatch: (runId: string) => void;
/** Remove a settled run from the UI (and reclaim its worktree). */ /** Remove a settled run from the UI (and reclaim its worktree). */
remove: (runId: string) => Promise<void>; remove: (runId: string) => Promise<void>;
/** Apply a partial update to a run record (used by stream reflectors). */
patchRun: (runId: string, patch: Partial<AgentRun>) => void;
/** Reflect a Bevy playtest lifecycle change (used by stream reflectors). */
reflectBevy: (runId: string, running: boolean) => void;
/** Fetch a run's branch diff vs main. */ /** Fetch a run's branch diff vs main. */
getDiff: (runId: string) => Promise<DiffResult>; getDiff: (runId: string) => Promise<DiffResult>;
/** Merge a run's branch into the main worktree. */ /** Merge a run's branch into the main worktree. */
@@ -63,16 +66,10 @@ export interface UseOrchestrator {
export function useOrchestrator(): UseOrchestrator { export function useOrchestrator(): UseOrchestrator {
const [runs, setRuns] = useState<AgentRun[]>([]); const [runs, setRuns] = useState<AgentRun[]>([]);
const [eventsByRun, setEventsByRun] = useState<Record<string, RunEvent[]>>({});
const [bevyRunning, setBevyRunning] = useState<Set<string>>(new Set()); const [bevyRunning, setBevyRunning] = useState<Set<string>>(new Set());
const [loading, setLoading] = useState(true); const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
// Refcounted EventSource subscriptions + replay cursors, kept in refs so the
// SSE callbacks always see fresh state without re-subscribing.
const sources = useRef(new Map<string, EventSource>());
const refcounts = useRef(new Map<string, number>());
const cursors = useRef(new Map<string, number>());
// Signature of the last loaded run list, so the background poll can skip // Signature of the last loaded run list, so the background poll can skip
// state updates (and re-renders) when nothing actually changed. // state updates (and re-renders) when nothing actually changed.
const lastSig = useRef(''); const lastSig = useRef('');
@@ -101,9 +98,10 @@ export function useOrchestrator(): UseOrchestrator {
void reload(); void reload();
}, [reload]); }, [reload]);
// Background poll keeps run status fresh even when no SSE stream is open // Background poll keeps run status fresh even when no card modal is open.
// (i.e. when no card modal is open). Poll faster while any run is active so a // Poll faster while any run is active so a collapsed card's running indicator
// collapsed card's running indicator turns over promptly when it settles. // turns over promptly when it settles. (The live event stream, when a modal is
// open, is the primary updater; this is a liveness backstop.)
const anyRunning = useMemo(() => runs.some((r) => r.status === 'running'), [runs]); const anyRunning = useMemo(() => runs.some((r) => r.status === 'running'), [runs]);
useEffect(() => { useEffect(() => {
const ms = anyRunning ? 3_000 : 10_000; const ms = anyRunning ? 3_000 : 10_000;
@@ -121,17 +119,8 @@ export function useOrchestrator(): UseOrchestrator {
}); });
}, []); }, []);
const appendEvent = useCallback((runId: string, ev: RunEvent) => {
setEventsByRun((prev) => {
const cur = prev[runId] ?? [];
// Dedup by seq when present (history flush vs. live may overlap).
if (typeof ev.seq === 'number' && cur.some((e) => e.seq === ev.seq)) return prev;
return { ...prev, [runId]: [...cur, ev] };
});
}, []);
const start = useCallback( const start = useCallback(
async (input: { cardId: string; prompt?: string }) => { async (input: { cardId: string; prompt?: string; refineRunId?: string }) => {
const { run } = await orchestratorApi.startRun(input); const { run } = await orchestratorApi.startRun(input);
upsertRun(run); upsertRun(run);
return run; return run;
@@ -146,106 +135,30 @@ export function useOrchestrator(): UseOrchestrator {
[], [],
); );
const stop = useCallback( const stop = useCallback(async (runId: string) => {
async (runId: string) => { await orchestratorApi.stopRun(runId);
await orchestratorApi.stopRun(runId); }, []);
},
[],
);
const remove = useCallback(async (runId: string) => { const remove = useCallback(async (runId: string) => {
await orchestratorApi.deleteRun(runId); await orchestratorApi.deleteRun(runId);
setRuns((prev) => prev.filter((r) => r.id !== runId)); setRuns((prev) => prev.filter((r) => r.id !== runId));
setEventsByRun((prev) => { }, []);
const next = { ...prev };
delete next[runId]; /** Apply a partial update to a run (status/summary/etc., from stream events). */
const patchRun = useCallback((runId: string, patch: Partial<AgentRun>) => {
setRuns((prev) => prev.map((r) => (r.id === runId ? { ...r, ...patch } : r)));
}, []);
/** Reflect a Bevy playtest lifecycle change (from stream events). */
const reflectBevy = useCallback((runId: string, running: boolean) => {
setBevyRunning((prev) => {
const next = new Set(prev);
if (running) next.add(runId);
else next.delete(runId);
return next; return next;
}); });
}, []); }, []);
/** (Re)open the SSE stream for a run, replaying persisted history first. */
const openStream = useCallback(
(runId: string) => {
if (sources.current.has(runId)) return;
const since = cursors.current.get(runId) ?? 0;
const es = new EventSource(`/api/orchestrator/runs/${runId}/stream?since=${since}`);
sources.current.set(runId, es);
es.addEventListener('event', (msg) => {
const ev = JSON.parse((msg as MessageEvent).data) as RunEvent;
if (typeof ev.seq === 'number') cursors.current.set(runId, ev.seq);
appendEvent(runId, ev);
// Reflect status changes onto the run record.
if (ev.type === 'status') {
const status = ev.data.status as AgentRun['status'];
setRuns((prev) =>
prev.map((r) =>
r.id === runId
? {
...r,
status,
finishedAt: ['completed', 'failed', 'stopped'].includes(status)
? new Date().toISOString()
: r.finishedAt,
}
: r,
),
);
}
if (ev.type === 'done' && typeof ev.data.summary === 'string') {
setRuns((prev) =>
prev.map((r) => (r.id === runId ? { ...r, summary: ev.data.summary as string } : r)),
);
}
// Track Bevy playtest lifecycle from its events.
if (ev.type === 'bevy') {
const phase = ev.data.phase;
setBevyRunning((prev) => {
const next = new Set(prev);
if (phase === 'start') next.add(runId);
else if (phase === 'end') next.delete(runId);
return next;
});
}
});
es.onerror = () => {
// EventSource auto-reconnects; nothing to do here.
};
},
[appendEvent],
);
const watch = useCallback(
(runId: string) => {
const n = (refcounts.current.get(runId) ?? 0) + 1;
refcounts.current.set(runId, n);
if (n === 1) openStream(runId);
},
[openStream],
);
const unwatch = useCallback((runId: string) => {
const n = (refcounts.current.get(runId) ?? 0) - 1;
if (n > 0) {
refcounts.current.set(runId, n);
return;
}
refcounts.current.delete(runId);
const es = sources.current.get(runId);
if (es) {
es.close();
sources.current.delete(runId);
}
}, []);
// Close all streams on unmount.
useEffect(() => {
return () => {
for (const es of sources.current.values()) es.close();
sources.current.clear();
refcounts.current.clear();
};
}, []);
const runForCard = useCallback( const runForCard = useCallback(
(cardId: string) => { (cardId: string) => {
const forCard = runs.filter((r) => r.cardId === cardId); const forCard = runs.filter((r) => r.cardId === cardId);
@@ -260,17 +173,8 @@ export function useOrchestrator(): UseOrchestrator {
[runs], [runs],
); );
const eventsForRun = useCallback((runId: string) => eventsByRun[runId] ?? [], [eventsByRun]); const getDiff = useCallback((runId: string) => orchestratorApi.getDiff(runId), []);
const mergeRun = useCallback((runId: string) => orchestratorApi.mergeRun(runId), []);
const getDiff = useCallback(
(runId: string) => orchestratorApi.getDiff(runId),
[],
);
const mergeRun = useCallback(
(runId: string) => orchestratorApi.mergeRun(runId),
[],
);
const startBevy = useCallback(async (runId: string) => { const startBevy = useCallback(async (runId: string) => {
await orchestratorApi.startBevy(runId); await orchestratorApi.startBevy(runId);
@@ -292,7 +196,6 @@ export function useOrchestrator(): UseOrchestrator {
/** /**
* Card-id index of active runs. Recomputed only when `runs` or `bevyRunning` * Card-id index of active runs. Recomputed only when `runs` or `bevyRunning`
* changes — NOT on every streamed event — so memoized consumers stay stable. * changes — NOT on every streamed event — so memoized consumers stay stable.
* Replaces the per-card `.filter().find()` scans the board used to do.
*/ */
const activeByCard = useMemo(() => { const activeByCard = useMemo(() => {
const m = new Map<string, { running: boolean; bevy: boolean; runId: string }>(); const m = new Map<string, { running: boolean; bevy: boolean; runId: string }>();
@@ -318,26 +221,50 @@ export function useOrchestrator(): UseOrchestrator {
} }
}, []); }, []);
return { // Stable identity: re-created only when registry state changes, so consumers
runForCard, // (and the per-run stream effect) don't churn on unrelated renders.
isRunning, return useMemo<UseOrchestrator>(
runs, () => ({
eventsForRun, runForCard,
reload, isRunning,
loading, runs,
error, reload,
start, loading,
message, error,
stop, start,
watch, message,
unwatch, stop,
remove, remove,
getDiff, patchRun,
mergeRun, reflectBevy,
startBevy, getDiff,
stopBevy, mergeRun,
bevyIsRunning, startBevy,
refreshBevyStatus, stopBevy,
activeByCard, bevyIsRunning,
}; refreshBevyStatus,
activeByCard,
}),
[
runForCard,
isRunning,
runs,
reload,
loading,
error,
start,
message,
stop,
remove,
patchRun,
reflectBevy,
getDiff,
mergeRun,
startBevy,
stopBevy,
bevyIsRunning,
refreshBevyStatus,
activeByCard,
],
);
} }

View File

@@ -0,0 +1,71 @@
import { useEffect, useState } from 'react';
import type { AgentRun, RunEvent } from '../../lib/orchestratorApi';
import type { UseOrchestrator } from './useOrchestrator';
/**
* Live event stream for a single agent run, owned locally by the run console
* (`AgentRunBar`).
*
* Keeping the stream — and the event log it accumulates — here, rather than in
* the board-level `useOrchestrator`, means a burst of streamed events re-renders
* ONLY the console. The board page and card modal are untouched by per-event
* updates, which is what eliminates the frame drops at run start.
*
* Lifecycle events (status / bevy / done) are reflected back into the shared
* run registry via `patchRun` / `reflectBevy`, so the board's active indicators
* and the run record stay correct. The server replays persisted history first
* (`since=0`), so opening a modal shows the run's prior activity; duplicates are
* de-duped by `seq`.
*
* Returns the ordered events for `runId` (empty while `runId` is null).
*/
export function useRunStream(runId: string | null, orch: UseOrchestrator): RunEvent[] {
// Pull the two reflectors out so the effect can depend on THESE stable
// callbacks (each is `useCallback(..., [])`) rather than the whole `orch`
// object — whose identity changes on every registry mutation. Depending on
// `orch` directly would tear down + reopen the EventSource (and wipe the log)
// on every status/bevy/done event, looping forever. See `patchRun`/
// `reflectBevy` in `useOrchestrator`.
const { patchRun, reflectBevy } = orch;
const [events, setEvents] = useState<RunEvent[]>([]);
useEffect(() => {
// Reset when switching runs (or closing the console).
setEvents([]);
if (!runId) return;
const es = new EventSource(`/api/orchestrator/runs/${runId}/stream?since=0`);
const onEvent = (msg: Event) => {
const ev = JSON.parse((msg as MessageEvent).data) as RunEvent;
setEvents((prev) => {
// Dedup by seq (history replay vs. live may overlap).
if (typeof ev.seq === 'number' && prev.some((e) => e.seq === ev.seq)) return prev;
return [...prev, ev];
});
// Reflect lifecycle events back to the shared registry.
if (ev.type === 'status') {
const status = ev.data.status as AgentRun['status'];
const settling = ['completed', 'failed', 'stopped'].includes(status);
patchRun(runId, settling ? { status, finishedAt: new Date().toISOString() } : { status });
} else if (ev.type === 'done' && typeof ev.data.summary === 'string') {
patchRun(runId, { summary: ev.data.summary });
} else if (ev.type === 'bevy') {
const phase = ev.data.phase;
if (phase === 'start') reflectBevy(runId, true);
else if (phase === 'end') reflectBevy(runId, false);
}
};
es.addEventListener('event', onEvent);
// EventSource auto-reconnects on error; nothing to do on onerror.
return () => {
es.removeEventListener('event', onEvent);
es.close();
};
}, [runId, patchRun, reflectBevy]);
return events;
}

View File

@@ -17,8 +17,12 @@ export interface AgentRun {
cardId: string; cardId: string;
status: RunStatus; status: RunStatus;
useWorktree: boolean; useWorktree: boolean;
/** Whether this run created (and therefore owns/cleans up) its worktree. */
ownsWorktree: boolean;
branch: string | null; branch: string | null;
worktreePath: string | null; worktreePath: string | null;
/** pi session JSONL path persisted for this run (resumable by refinements). */
sessionFile: string | null;
prompt: string; prompt: string;
summary: string | null; summary: string | null;
commitSha: string | null; commitSha: string | null;
@@ -78,6 +82,11 @@ export interface StartRunInput {
prompt?: string; prompt?: string;
useWorktree?: boolean; useWorktree?: boolean;
cleanupOnFinish?: boolean; cleanupOnFinish?: boolean;
/**
* If set, start a refinement run that continues this prior run's branch with
* `prompt` as the operator's refinement feedback.
*/
refineRunId?: string;
} }
/** A commit on a run's branch that is not yet on main. */ /** A commit on a run's branch that is not yet on main. */