Files
Neon-Desk/lib/server/api/task-workflow-hybrid.e2e.test.ts

320 lines
9.8 KiB
TypeScript

import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
it,
mock
} from 'bun:test';
import { mkdtempSync, readFileSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { WorkflowRunStatus } from '@workflow/world';
const TEST_USER_ID = 'e2e-user';
const TEST_USER_EMAIL = 'e2e@example.com';
const TEST_USER_NAME = 'E2E User';
const runStatuses = new Map<string, WorkflowRunStatus>();
let runCounter = 0;
let workflowBackendHealthy = true;
let tempDir: string | null = null;
let sqliteClient: { exec: (query: string) => void; close: () => void } | null = null;
let app: { handle: (request: Request) => Promise<Response> } | null = null;
mock.module('workflow/api', () => ({
start: mock(async () => {
runCounter += 1;
const runId = `run-${runCounter}`;
runStatuses.set(runId, 'pending');
return { runId };
}),
getRun: mock((runId: string) => ({
get status() {
return Promise.resolve(runStatuses.get(runId) ?? 'pending');
}
}))
}));
mock.module('workflow/runtime', () => ({
getWorld: () => ({
runs: {
list: async () => {
if (!workflowBackendHealthy) {
throw new Error('Workflow backend unavailable');
}
return {
data: []
};
}
}
})
}));
mock.module('@/lib/server/auth-session', () => ({
requireAuthenticatedSession: async () => ({
session: {
user: {
id: TEST_USER_ID,
email: TEST_USER_EMAIL,
name: TEST_USER_NAME,
image: null
}
},
response: null
})
}));
function resetDbSingletons() {
const globalState = globalThis as typeof globalThis & {
__fiscalSqliteClient?: { close?: () => void };
__fiscalDrizzleDb?: unknown;
};
globalState.__fiscalSqliteClient?.close?.();
globalState.__fiscalSqliteClient = undefined;
globalState.__fiscalDrizzleDb = undefined;
}
function applySqlMigrations(client: { exec: (query: string) => void }) {
const migrationFiles = [
'0000_cold_silver_centurion.sql',
'0001_glossy_statement_snapshots.sql',
'0002_workflow_task_projection_metadata.sql',
'0003_task_stage_event_timeline.sql'
];
for (const file of migrationFiles) {
const sql = readFileSync(join(process.cwd(), 'drizzle', file), 'utf8');
client.exec(sql);
}
}
function ensureTestUser(client: { exec: (query: string) => void }) {
const now = Date.now();
client.exec(`
INSERT OR REPLACE INTO user (
id, name, email, emailVerified, image, createdAt, updatedAt, role, banned, banReason, banExpires
) VALUES (
'${TEST_USER_ID}',
'${TEST_USER_NAME}',
'${TEST_USER_EMAIL}',
1,
NULL,
${now},
${now},
NULL,
0,
NULL,
NULL
);
`);
}
function clearProjectionTables(client: { exec: (query: string) => void }) {
client.exec('DELETE FROM task_stage_event;');
client.exec('DELETE FROM task_run;');
}
async function jsonRequest(
method: 'GET' | 'POST' | 'PATCH',
path: string,
body?: Record<string, unknown>
) {
if (!app) {
throw new Error('app not initialized');
}
const response = await app.handle(new Request(`http://localhost${path}`, {
method,
headers: body ? { 'content-type': 'application/json' } : undefined,
body: body ? JSON.stringify(body) : undefined
}));
return {
response,
json: await response.json()
};
}
if (process.env.RUN_TASK_WORKFLOW_E2E === '1') {
describe('task workflow hybrid migration e2e', () => {
beforeAll(async () => {
tempDir = mkdtempSync(join(tmpdir(), 'fiscal-task-e2e-'));
const env = process.env as Record<string, string | undefined>;
env.DATABASE_URL = `file:${join(tempDir, 'e2e.sqlite')}`;
env.NODE_ENV = 'test';
resetDbSingletons();
const dbModule = await import('@/lib/server/db');
sqliteClient = dbModule.getSqliteClient();
applySqlMigrations(sqliteClient);
ensureTestUser(sqliteClient);
const appModule = await import('./app');
app = appModule.app;
});
afterAll(() => {
resetDbSingletons();
if (tempDir) {
rmSync(tempDir, { recursive: true, force: true });
}
});
beforeEach(() => {
if (!sqliteClient) {
throw new Error('sqlite client not initialized');
}
clearProjectionTables(sqliteClient);
runStatuses.clear();
runCounter = 0;
workflowBackendHealthy = true;
});
it('queues multiple analyze jobs and suppresses duplicate in-flight analyze jobs', async () => {
const first = await jsonRequest('POST', '/api/filings/0000000000-26-000001/analyze');
expect(first.response.status).toBe(200);
const firstTaskId = (first.json as { task: { id: string } }).task.id;
const [second, third] = await Promise.all([
jsonRequest('POST', '/api/filings/0000000000-26-000002/analyze'),
jsonRequest('POST', '/api/filings/0000000000-26-000003/analyze')
]);
expect(second.response.status).toBe(200);
expect(third.response.status).toBe(200);
const duplicate = await jsonRequest('POST', '/api/filings/0000000000-26-000001/analyze');
expect(duplicate.response.status).toBe(200);
expect((duplicate.json as { task: { id: string } }).task.id).toBe(firstTaskId);
const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=10');
expect(tasksResponse.response.status).toBe(200);
const tasks = (tasksResponse.json as {
tasks: Array<{
id: string;
status: string;
stage: string;
workflow_run_id?: string | null;
}>;
}).tasks;
expect(tasks.length).toBe(3);
expect(tasks.every((task) => task.status === 'queued')).toBe(true);
expect(tasks.every((task) => task.stage === 'queued')).toBe(true);
expect(tasks.every((task) => typeof task.workflow_run_id === 'string' && task.workflow_run_id.length > 0)).toBe(true);
});
it('updates notification read and silenced state via patch endpoint', async () => {
const created = await jsonRequest('POST', '/api/filings/0000000000-26-000010/analyze');
const taskId = (created.json as { task: { id: string } }).task.id;
const readUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, { read: true });
expect(readUpdate.response.status).toBe(200);
const readTask = (readUpdate.json as {
task: {
notification_read_at: string | null;
notification_silenced_at: string | null;
};
}).task;
expect(readTask.notification_read_at).toBeTruthy();
expect(readTask.notification_silenced_at).toBeNull();
const silencedUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, {
silenced: true
});
expect(silencedUpdate.response.status).toBe(200);
const silencedTask = (silencedUpdate.json as {
task: {
notification_read_at: string | null;
notification_silenced_at: string | null;
};
}).task;
expect(silencedTask.notification_read_at).toBeTruthy();
expect(silencedTask.notification_silenced_at).toBeTruthy();
const resetUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, {
read: false,
silenced: false
});
expect(resetUpdate.response.status).toBe(200);
const resetTask = (resetUpdate.json as {
task: {
notification_read_at: string | null;
notification_silenced_at: string | null;
};
}).task;
expect(resetTask.notification_read_at).toBeNull();
expect(resetTask.notification_silenced_at).toBeNull();
});
it('reconciles workflow run status into projection state and degrades health when workflow backend is down', async () => {
const created = await jsonRequest('POST', '/api/filings/0000000000-26-000100/analyze');
const task = (created.json as {
task: { id: string; workflow_run_id: string };
}).task;
runStatuses.set(task.workflow_run_id, 'running');
const running = await jsonRequest('GET', `/api/tasks/${task.id}`);
expect(running.response.status).toBe(200);
const runningTask = (running.json as { task: { status: string; stage: string } }).task;
expect(runningTask.status).toBe('running');
expect(runningTask.stage).toBe('running');
runStatuses.set(task.workflow_run_id, 'completed');
const completed = await jsonRequest('GET', `/api/tasks/${task.id}`);
expect(completed.response.status).toBe(200);
const completedTask = (completed.json as {
task: {
status: string;
stage: string;
finished_at: string | null;
};
}).task;
expect(completedTask.status).toBe('completed');
expect(completedTask.stage).toBe('completed');
expect(completedTask.finished_at).toBeTruthy();
const timeline = await jsonRequest('GET', `/api/tasks/${task.id}/timeline`);
expect(timeline.response.status).toBe(200);
const events = (timeline.json as {
events: Array<{
stage: string;
status: string;
}>;
}).events;
expect(events.length).toBeGreaterThanOrEqual(3);
expect(events.some((event) => event.status === 'queued')).toBe(true);
expect(events.some((event) => event.status === 'running')).toBe(true);
expect(events.some((event) => event.status === 'completed')).toBe(true);
const healthy = await jsonRequest('GET', '/api/health');
expect(healthy.response.status).toBe(200);
expect((healthy.json as { status: string; workflow: { ok: boolean } }).status).toBe('ok');
expect((healthy.json as { status: string; workflow: { ok: boolean } }).workflow.ok).toBe(true);
workflowBackendHealthy = false;
const degraded = await jsonRequest('GET', '/api/health');
expect(degraded.response.status).toBe(503);
expect((degraded.json as {
status: string;
workflow: { ok: boolean; reason: string };
}).status).toBe('degraded');
expect((degraded.json as {
status: string;
workflow: { ok: boolean; reason: string };
}).workflow.ok).toBe(false);
});
});
}