Files
Space-Game/apps/api/src/routes/orchestrator.ts
francy51 607f3fbd8b
Some checks failed
CI / Rust Check (push) Has been cancelled
CI / TypeScript Check (docs) (push) Has been cancelled
CI / TypeScript Check (site) (push) Has been cancelled
CI / Security Audit (push) Has been cancelled
feat(kanban): auto-resolve merge-all conflicts with an agent
When the merge-all batch hits a conflict, spawn a conflict-resolution
agent in the main worktree (where the merge is mid-conflict), await it,
and continue the batch through the remaining branches instead of
aborting for manual resolution.

- worktrees: replace abort-on-conflict mergeBranches with non-aborting
  attemptMerge (leaves the tree mid-merge, returns conflicted paths),
  plus abortMerge/hasMergeInProgress/unmergedPaths/createRecoveryBranch.
  Batch types gain conflict/resolutionRunId/recoveryRef.
- prompt: buildConflictResolutionPrompt resolves both sides preserving
  intent, honors repo conventions (no Rust warning suppression), runs
  cargo/pnpm checks, then completes the merge with a commit.
- runs: mergeAll() is async; per branch it attemptMerges, and on conflict
  starts a streamable resolution run in REPO_ROOT, awaits it, and
  verifies the merge actually completed (no MERGE_HEAD, clean tree) before
  continuing — otherwise aborts and halts. A mergeBatchInProgress guard
  prevents two batches at once; a pre-batch recovery ref snapshots HEAD.
- route: /runs/merge-all is async; node request/socket timeouts disabled
  so a long multi-conflict batch survives.
- UI: per-branch status shows "conflict resolved by agent" + resolution
  run id; a recovery-ref banner documents how to roll the batch back.

Safety: resolution only counts if verified; otherwise the merge is
aborted and main left clean. Resolution runs are real agent_runs (stream
into the dock, stoppable) and own no worktree.
EOF && echo "" && git log --oneline -3
2026-06-17 22:07:49 -04:00

232 lines
8.7 KiB
TypeScript

/**
* Orchestrator HTTP routes.
*
* Exposes run lifecycle (start/list/get/delete), interactive control
* (steer/stop), and a Server-Sent-Events stream of a run's slim event log. The
* agent itself runs in a `pi` subprocess and drives the board via the separate
* `internal` router; these routes are for the human-facing UI.
*/
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
import { db } from '../db.js';
import type { Card } from '../types.js';
import { hydrateCard } from './kanban.js';
import { RunNotFoundError, runManager } from '../orchestrator/runs.js';
import {
commitsAheadOfHead,
diffPatch,
diffStat,
mainDirtySummary,
isWorktreePresent,
mergeBranch,
} from '../orchestrator/worktrees.js';
export const orchestrator = new Hono();
/** Hydrate a full card (with references/tags/comments) for prompt building. */
function loadCard(cardId: string): Card | undefined {
const row = db.prepare('SELECT * FROM cards WHERE id = ?').get(cardId);
if (!row) return undefined;
return hydrateCard(row as Parameters<typeof hydrateCard>[0]);
}
/** List recent runs (optionally filtered to a card). */
orchestrator.get('/runs', (c) => {
const cardId = c.req.query('cardId');
const runs = cardId ? runManager.listForCard(cardId) : runManager.list();
return c.json({ runs });
});
/** Get a single run, including its persisted event history. */
orchestrator.get('/runs/:id', (c) => {
const run = runManager.get(c.req.param('id'));
if (!run) return c.json({ error: 'run not found' }, 404);
const since = Number(c.req.query('since') ?? '0');
return c.json({ run, events: runManager.events(run.id, Number.isFinite(since) ? since : 0) });
});
/** Start a new run for a card. */
orchestrator.post('/runs', async (c) => {
const body = await c.req.json().catch(() => ({}) as Record<string, unknown>);
const cardId = typeof body.cardId === 'string' ? body.cardId : '';
if (!cardId) return c.json({ error: "'cardId' is required" }, 400);
const card = loadCard(cardId);
if (!card) return c.json({ error: 'card not found' }, 404);
const prompt = typeof body.prompt === 'string' ? body.prompt : undefined;
const useWorktree = body.useWorktree !== false; // default true
const cleanupOnFinish = body.cleanupOnFinish === true;
const refineRunId = typeof body.refineRunId === 'string' ? body.refineRunId : undefined;
try {
const run = runManager.start(card, { cardId, prompt, useWorktree, cleanupOnFinish, refineRunId });
return c.json({ run }, 201);
} catch (err) {
const message = err instanceof Error ? err.message : 'failed to start run';
// A bad refinement target (missing run / cleaned-up worktree) is a client error.
const status = message.startsWith('refinement') ? 409 : 500;
return c.json({ error: message }, status);
}
});
/** Steer or follow-up an active run. */
orchestrator.post('/runs/:id/message', async (c) => {
const id = c.req.param('id');
const body = await c.req.json().catch(() => ({}) as Record<string, unknown>);
const text = typeof body.text === 'string' ? body.text.trim() : '';
const mode = body.mode === 'followUp' ? 'followUp' : 'steer';
if (!text) return c.json({ error: "'text' is required" }, 400);
try {
runManager.message(id, text, mode);
return c.json({ ok: true });
} catch (err) {
if (err instanceof RunNotFoundError) return c.json({ error: 'run is not active' }, 409);
throw err;
}
});
/** Stop an active or wedged run (force-settles a run with no live process). */
orchestrator.post('/runs/:id/stop', (c) => {
const id = c.req.param('id');
try {
runManager.stop(id);
return c.json({ ok: true });
} catch (err) {
if (err instanceof RunNotFoundError) return c.json({ error: 'run is not active' }, 409);
throw err;
}
});
/** Delete a (settled) run and reclaim its worktree. */
orchestrator.delete('/runs/:id', (c) => {
const id = c.req.param('id');
try {
runManager.remove(id);
return c.body(null, 204);
} catch (err) {
const message = err instanceof Error ? err.message : 'failed to delete run';
const status = message.includes('active') ? 409 : 404;
return c.json({ error: message }, status);
}
});
// --- worktree review & playtesting ----------------------------------------
/** Diff of a run's branch vs main (commits, --stat, and the capped patch). */
orchestrator.get('/runs/:id/diff', (c) => {
const run = runManager.get(c.req.param('id'));
if (!run) return c.json({ error: 'run not found' }, 404);
if (!run.branch || !run.worktreePath || !isWorktreePresent(run.worktreePath)) {
return c.json({ error: 'run has no worktree to diff' }, 409);
}
const { patch, truncated } = diffPatch(run.branch);
return c.json({
branch: run.branch,
commits: commitsAheadOfHead(run.branch),
stat: diffStat(run.branch),
patch,
truncated,
});
});
/** Merge a run's branch into the main worktree's checked-out branch. */
orchestrator.post('/runs/:id/merge', (c) => {
const run = runManager.get(c.req.param('id'));
if (!run) return c.json({ error: 'run not found' }, 404);
if (!run.branch) return c.json({ error: 'run has no branch to merge' }, 409);
// Refuse if the main worktree is dirty — merging would be unsafe.
const dirty = mainDirtySummary();
if (dirty) return c.json({ error: `Cannot merge: ${dirty}` }, 409);
// The merge outcome (success or conflict-aborted) is in the body; only hard
// preconditions (above) throw HTTP errors so the UI can read `ok` directly.
return c.json(mergeBranch(run.branch));
});
/** Merge all reviewable agent branches into the main branch sequentially,
* spawning a conflict-resolution agent on any conflict. May block for a long
* time while agents resolve conflicts. */
orchestrator.post('/runs/merge-all', async (c) => {
try {
const result = await runManager.mergeAll();
return c.json(result);
} catch (err) {
const message = err instanceof Error ? err.message : 'merge-all failed';
return c.json({ error: message }, 409);
}
});
/** Whether a Bevy playtest is running for a run. */
orchestrator.get('/runs/:id/bevy', (c) => {
const run = runManager.get(c.req.param('id'));
if (!run) return c.json({ error: 'run not found' }, 404);
return c.json({ running: runManager.bevyRunning(run.id) });
});
/** Launch a Bevy playtest (`cargo run`) in a run's worktree. */
orchestrator.post('/runs/:id/bevy', (c) => {
const id = c.req.param('id');
try {
runManager.startBevy(id);
return c.json({ ok: true });
} catch (err) {
if (err instanceof RunNotFoundError) return c.json({ error: 'run not found' }, 404);
const message = err instanceof Error ? err.message : 'failed to start Bevy';
const status = message.includes('no worktree') || message.includes('already running') ? 409 : 500;
return c.json({ error: message }, status);
}
});
/** Stop a run's Bevy playtest. */
orchestrator.post('/runs/:id/bevy/stop', (c) => {
runManager.stopBevy(c.req.param('id'));
return c.json({ ok: true });
});
/** Live event stream for a run (Server-Sent Events). */
orchestrator.get('/runs/:id/stream', (c) => {
const id = c.req.param('id');
const run = runManager.get(id);
if (!run) return c.json({ error: 'run not found' }, 404);
return streamSSE(c, async (stream) => {
// Replay any persisted history the client hasn't seen yet.
let lastSeq = Number(c.req.query('since') ?? '0');
if (!Number.isFinite(lastSeq)) lastSeq = 0;
const flushHistory = () => {
for (const ev of runManager.events(id, lastSeq)) {
lastSeq = ev.seq;
void stream.writeSSE({ event: 'event', data: JSON.stringify({ seq: ev.seq, type: ev.type, data: ev.data, createdAt: ev.createdAt }) });
}
};
flushHistory();
// If the run is still active, subscribe to live events. Each carries its
// persisted `seq`, so advancing `lastSeq` here keeps the periodic history
// flush below from ever re-emitting the same event.
const unsubscribe = runManager.subscribe(id, (ev) => {
if (ev.seq <= lastSeq) return;
lastSeq = ev.seq;
const { seq, createdAt, type, ...data } = ev;
void stream.writeSSE({ event: 'event', data: JSON.stringify({ seq, type, data, createdAt }) });
});
// Periodic flush of persisted history catches events written straight to the
// DB (agent `curl` mutations, settle-time warnings) that bypass the live
// subscriber. Since `lastSeq` tracks everything delivered, this is a no-op
// except for those DB-direct events.
const timer = setInterval(flushHistory, 1500);
// Keep the stream open until the client disconnects. `onAbort` fires on
// disconnect; resolving the promise lets `streamSSE` finish cleanly.
await new Promise<void>((resolve) => {
stream.onAbort(() => resolve());
});
clearInterval(timer);
unsubscribe();
});
});