feat(api): add backend with kanban, docs store, and orchestrator
Introduce the @void-nav/api Hono + SQLite backend that powers the docs site: a persisted implementation board (kanban), a custom documentation-pages store with AI beautify, and an agentic orchestrator that runs `pi` agents per card. The orchestrator spawns `pi --mode rpc` inside an isolated git worktree per run, streams slim events over SSE, and lets the agent drive the board/docs via token-gated internal endpoints (all SQLite writes stay in-process). Interrupted runs are reconciled to "stopped" on boot. Workspace wiring: root `dev:api`/`dev:web` scripts with `concurrently`, the docs Vite `/api` proxy, and `.worktrees/` gitignore.
This commit is contained in:
244
apps/api/src/orchestrator/runs.ts
Normal file
244
apps/api/src/orchestrator/runs.ts
Normal file
@@ -0,0 +1,244 @@
|
||||
/**
|
||||
* Run manager: the single source of truth for orchestrator runs.
|
||||
*
|
||||
* Each run works one kanban card inside an (optional) isolated git worktree.
|
||||
* The manager persists run state to SQLite, keeps active `Runner` instances in
|
||||
* memory (so routes can steer/stop/stream them), and finalizes the run record
|
||||
* when a runner settles — capturing the head commit and a work summary.
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { db, type AgentRunRow } from '../db.js';
|
||||
import type { AgentRun, AgentRunEvent, Card, RunStatus } from '../types.js';
|
||||
import { newToken, REPO_ROOT } from './config.js';
|
||||
import { buildPrompt } from './prompt.js';
|
||||
import { Runner, type PersistedEvent } from './runner.js';
|
||||
import { createWorktree, dirtySummary, headSha, removeWorktree, type Worktree } from './worktrees.js';
|
||||
|
||||
export interface StartRunInput {
|
||||
cardId: string;
|
||||
/** Extra operator instructions appended to the agent's prompt. */
|
||||
prompt?: string;
|
||||
/** Default true: run inside an isolated git worktree. */
|
||||
useWorktree?: boolean;
|
||||
/** If true, delete the worktree + branch when the run settles. */
|
||||
cleanupOnFinish?: boolean;
|
||||
}
|
||||
|
||||
class RunManager {
|
||||
/** Active runners keyed by run id. */
|
||||
private active = new Map<string, { runner: Runner; worktree: Worktree | null; cleanup: boolean }>();
|
||||
|
||||
/** Create and start a run for a card. Returns the persisted run. */
|
||||
start(card: Card, input: StartRunInput): AgentRun {
|
||||
const id = randomUUID();
|
||||
const token = newToken();
|
||||
const useWorktree = input.useWorktree ?? true;
|
||||
const now = new Date().toISOString();
|
||||
|
||||
// Provision the worktree (or fall back to the repo root).
|
||||
let worktree: Worktree | null = null;
|
||||
let branch: string | null = null;
|
||||
let worktreePath: string | null = null;
|
||||
if (useWorktree) {
|
||||
worktree = createWorktree(id, card.id);
|
||||
branch = worktree.branch;
|
||||
worktreePath = worktree.path;
|
||||
}
|
||||
|
||||
const prompt = buildPrompt(card, { token, runId: id }, input.prompt);
|
||||
|
||||
try {
|
||||
db.prepare(
|
||||
`INSERT INTO agent_runs
|
||||
(id, card_id, status, use_worktree, branch, worktree_path, prompt, token, created_at, started_at)
|
||||
VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?, ?)`,
|
||||
).run(id, card.id, useWorktree ? 1 : 0, branch, worktreePath, prompt, token, now, now);
|
||||
|
||||
const cwd = worktreePath ?? REPO_ROOT;
|
||||
const runner = new Runner({
|
||||
runId: id,
|
||||
cwd,
|
||||
prompt,
|
||||
onSettled: (status, summary) => this.settle(id, status, summary),
|
||||
});
|
||||
this.active.set(id, { runner, worktree, cleanup: input.cleanupOnFinish ?? false });
|
||||
runner.start();
|
||||
} catch (err) {
|
||||
// Persisting or starting failed — reclaim the worktree so we never leak one.
|
||||
if (worktree) removeWorktree(id, worktree.branch);
|
||||
throw err;
|
||||
}
|
||||
|
||||
return this.get(id)!;
|
||||
}
|
||||
|
||||
/** Whether a run is currently active (steerable / stoppable). */
|
||||
isActive(id: string): boolean {
|
||||
return this.active.has(id);
|
||||
}
|
||||
|
||||
/** Send a steer/follow-up message to an active run. */
|
||||
message(id: string, text: string, mode: 'steer' | 'followUp'): void {
|
||||
const entry = this.active.get(id);
|
||||
if (!entry) throw new RunNotFoundError(id);
|
||||
entry.runner.message(text, mode);
|
||||
}
|
||||
|
||||
/** Abort an active run. */
|
||||
stop(id: string): void {
|
||||
const entry = this.active.get(id);
|
||||
if (!entry) throw new RunNotFoundError(id);
|
||||
entry.runner.stop();
|
||||
}
|
||||
|
||||
/** Subscribe to a run's live slim-event stream (best-effort; may be inactive). */
|
||||
subscribe(id: string, fn: (event: PersistedEvent) => void): () => void {
|
||||
const entry = this.active.get(id);
|
||||
if (entry) return entry.runner.subscribe(fn);
|
||||
return () => {};
|
||||
}
|
||||
|
||||
/** Delete a run record (and its worktree if still present). */
|
||||
remove(id: string): void {
|
||||
const entry = this.active.get(id);
|
||||
if (entry) throw new Error('cannot delete an active run; stop it first');
|
||||
const row = this.get(id);
|
||||
if (row?.worktreePath && row.branch) removeWorktree(id, row.branch);
|
||||
db.prepare('DELETE FROM agent_run_events WHERE run_id = ?').run(id);
|
||||
db.prepare('DELETE FROM agent_runs WHERE id = ?').run(id);
|
||||
}
|
||||
|
||||
// --- reads ---------------------------------------------------------------
|
||||
|
||||
get(id: string): AgentRun | undefined {
|
||||
const row = db.prepare('SELECT * FROM agent_runs WHERE id = ?').get(id) as AgentRunRow | undefined;
|
||||
return row ? hydrateRun(row) : undefined;
|
||||
}
|
||||
|
||||
/** Resolve a run by its agent token (used to authorize internal calls). */
|
||||
getByToken(token: string): AgentRun | undefined {
|
||||
const row = db
|
||||
.prepare('SELECT * FROM agent_runs WHERE token = ? ORDER BY created_at DESC LIMIT 1')
|
||||
.get(token) as AgentRunRow | undefined;
|
||||
return row ? hydrateRun(row) : undefined;
|
||||
}
|
||||
|
||||
/** Recent runs, newest first. */
|
||||
list(limit = 50): AgentRun[] {
|
||||
const rows = db
|
||||
.prepare('SELECT * FROM agent_runs ORDER BY created_at DESC LIMIT ?')
|
||||
.all(limit) as AgentRunRow[];
|
||||
return rows.map(hydrateRun);
|
||||
}
|
||||
|
||||
/** Runs for a given card (newest first). */
|
||||
listForCard(cardId: string): AgentRun[] {
|
||||
const rows = db
|
||||
.prepare('SELECT * FROM agent_runs WHERE card_id = ? ORDER BY created_at DESC')
|
||||
.all(cardId) as AgentRunRow[];
|
||||
return rows.map(hydrateRun);
|
||||
}
|
||||
|
||||
/** Replayable event history for a run, optionally after a sequence number. */
|
||||
events(id: string, sinceSeq = 0): AgentRunEvent[] {
|
||||
const rows = db
|
||||
.prepare('SELECT * FROM agent_run_events WHERE run_id = ? AND seq > ? ORDER BY seq ASC')
|
||||
.all(id, sinceSeq) as { id: number; run_id: string; seq: number; type: string; data: string; created_at: string }[];
|
||||
return rows.map((r) => ({
|
||||
id: r.id,
|
||||
runId: r.run_id,
|
||||
seq: r.seq,
|
||||
type: r.type,
|
||||
data: safeParse(r.data),
|
||||
createdAt: r.created_at,
|
||||
}));
|
||||
}
|
||||
|
||||
// --- internal: finalize a run -------------------------------------------
|
||||
|
||||
private settle(id: string, status: RunStatus, summary: string | null): void {
|
||||
const entry = this.active.get(id);
|
||||
const now = new Date().toISOString();
|
||||
|
||||
let commitSha: string | null = null;
|
||||
let extraError: string | null = null;
|
||||
if (entry?.worktree) {
|
||||
commitSha = headSha(entry.worktree.path);
|
||||
const dirty = dirtySummary(entry.worktree.path);
|
||||
if (dirty) {
|
||||
// Uncommitted changes left behind — surface as an error note.
|
||||
extraError = `worktree has uncommitted changes:\n${dirty}`;
|
||||
// Persist it as a run event so it shows in the feed.
|
||||
db.prepare(
|
||||
`INSERT INTO agent_run_events (run_id, seq, type, data, created_at)
|
||||
VALUES (?, ?, 'log', ?, ?)`,
|
||||
).run(
|
||||
id,
|
||||
nextSeq(id),
|
||||
JSON.stringify({ level: 'warn', text: extraError }),
|
||||
now,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
db.prepare(
|
||||
`UPDATE agent_runs
|
||||
SET status = ?, summary = ?, commit_sha = ?, error = COALESCE(?, error), finished_at = ?
|
||||
WHERE id = ?`,
|
||||
).run(status, summary, commitSha, extraError, now, id);
|
||||
|
||||
this.active.delete(id);
|
||||
|
||||
// Optional cleanup of the worktree once the run is done.
|
||||
if (entry?.cleanup && entry.worktree) {
|
||||
removeWorktree(id, entry.worktree.branch);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class RunNotFoundError extends Error {
|
||||
constructor(id: string) {
|
||||
super(`agent run not found: ${id}`);
|
||||
this.name = 'RunNotFoundError';
|
||||
}
|
||||
}
|
||||
|
||||
// --- helpers ---------------------------------------------------------------
|
||||
|
||||
function hydrateRun(row: AgentRunRow): AgentRun {
|
||||
return {
|
||||
id: row.id,
|
||||
cardId: row.card_id,
|
||||
status: row.status as RunStatus,
|
||||
useWorktree: row.use_worktree === 1,
|
||||
branch: row.branch,
|
||||
worktreePath: row.worktree_path,
|
||||
prompt: row.prompt,
|
||||
summary: row.summary,
|
||||
commitSha: row.commit_sha,
|
||||
error: row.error,
|
||||
createdAt: row.created_at,
|
||||
startedAt: row.started_at,
|
||||
finishedAt: row.finished_at,
|
||||
};
|
||||
}
|
||||
|
||||
function nextSeq(runId: string): number {
|
||||
return (
|
||||
db
|
||||
.prepare('SELECT COALESCE(MAX(seq), 0) + 1 AS n FROM agent_run_events WHERE run_id = ?')
|
||||
.get(runId) as { n: number }
|
||||
).n;
|
||||
}
|
||||
|
||||
function safeParse(s: string): Record<string, unknown> {
|
||||
try {
|
||||
return JSON.parse(s) as Record<string, unknown>;
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
/** Process-wide singleton. */
|
||||
export const runManager = new RunManager();
|
||||
Reference in New Issue
Block a user