Files
Neon-Desk/lib/server/tasks.ts
francy51 ed4420b8db Add atomic task deduplication with partial unique index
- Add partial unique index for active resource-scoped tasks
- Implement createTaskRunRecordAtomic for race-free task creation
- Update findOrEnqueueTask to use atomic insert first
- Add tests for concurrent task creation deduplication
2026-03-15 14:40:38 -04:00

238 lines
6.3 KiB
TypeScript

import { randomUUID } from 'node:crypto';
import { getRun, start } from 'workflow/api';
import type { WorkflowRunStatus } from '@workflow/world';
import type { Task, TaskStatus, TaskTimeline, TaskType } from '@/lib/types';
import { runTaskWorkflow } from '@/app/workflows/task-runner';
import { describeTaskFailure } from '@/lib/server/task-errors';
import {
countTasksByStatus,
createTaskRunRecord,
createTaskRunRecordAtomic,
findInFlightTaskByResourceKey,
getTaskByIdForUser,
listTaskStageEventsForTask,
listRecentTasksForUser,
markTaskFailure,
setTaskStatusFromWorkflow,
setTaskWorkflowRunId,
updateTaskNotificationState
} from '@/lib/server/repos/tasks';
import { buildTaskNotification } from '@/lib/server/task-notifications';
type EnqueueTaskInput = {
userId: string;
taskType: TaskType;
payload?: Record<string, unknown>;
priority?: number;
maxAttempts?: number;
resourceKey?: string;
};
type UpdateTaskNotificationInput = {
read?: boolean;
silenced?: boolean;
};
function mapWorkflowStatus(status: WorkflowRunStatus): TaskStatus {
switch (status) {
case 'pending':
return 'queued';
case 'running':
return 'running';
case 'completed':
return 'completed';
case 'failed':
case 'cancelled':
return 'failed';
default:
return 'failed';
}
}
function isProjectionPendingSync(task: Task) {
return task.status === 'queued' || task.status === 'running';
}
async function reconcileTaskWithWorkflow(task: Task) {
if (!task.workflow_run_id || !isProjectionPendingSync(task)) {
return task;
}
try {
const run = getRun(task.workflow_run_id);
const workflowStatus = await run.status;
const nextStatus = mapWorkflowStatus(workflowStatus);
if (nextStatus === task.status) {
return task;
}
const failure = nextStatus === 'failed'
? describeTaskFailure(task, workflowStatus === 'cancelled' ? 'Workflow run cancelled' : 'Workflow run failed')
: null;
const updated = await setTaskStatusFromWorkflow(
task.id,
nextStatus,
failure?.detail ?? null,
failure?.summary ?? null
);
const fallbackTask = {
...task,
status: nextStatus,
stage: nextStatus,
stage_detail: nextStatus === 'failed' ? (failure?.summary ?? 'The background workflow stopped unexpectedly.') : 'Workflow run completed.',
stage_context: null,
error: failure?.detail ?? null,
finished_at: nextStatus === 'queued' || nextStatus === 'running'
? null
: task.finished_at ?? new Date().toISOString()
} satisfies Omit<Task, 'notification'>;
return updated ?? {
...fallbackTask,
notification: buildTaskNotification(fallbackTask)
};
} catch {
return task;
}
}
export async function enqueueTask(input: EnqueueTaskInput) {
const task = await createTaskRunRecord({
id: randomUUID(),
user_id: input.userId,
task_type: input.taskType,
payload: input.payload ?? {},
priority: input.priority ?? 50,
max_attempts: input.maxAttempts ?? 3,
resource_key: input.resourceKey ?? null
});
try {
const run = await start(runTaskWorkflow, [task.id]);
await setTaskWorkflowRunId(task.id, run.runId);
return {
...task,
workflow_run_id: run.runId
} satisfies Task;
} catch (error) {
const failure = describeTaskFailure(task, 'Failed to start workflow');
await markTaskFailure(task.id, failure.detail, 'failed', {
detail: failure.summary
});
const wrapped = new Error(failure.detail);
(wrapped as Error & { cause?: unknown }).cause = error;
throw wrapped;
}
}
export async function findOrEnqueueTask(input: EnqueueTaskInput) {
if (!input.resourceKey) {
return await enqueueTask(input);
}
const taskId = randomUUID();
const result = await createTaskRunRecordAtomic({
id: taskId,
user_id: input.userId,
task_type: input.taskType,
payload: input.payload ?? {},
priority: input.priority ?? 50,
max_attempts: input.maxAttempts ?? 3,
resource_key: input.resourceKey
});
if (result.created) {
try {
const run = await start(runTaskWorkflow, [result.task.id]);
await setTaskWorkflowRunId(result.task.id, run.runId);
return {
...result.task,
workflow_run_id: run.runId
} satisfies Task;
} catch (error) {
const failure = describeTaskFailure(result.task, 'Failed to start workflow');
await markTaskFailure(result.task.id, failure.detail, 'failed', {
detail: failure.summary
});
const wrapped = new Error(failure.detail);
(wrapped as Error & { cause?: unknown }).cause = error;
throw wrapped;
}
}
const existingTask = await findInFlightTaskByResourceKey(
input.userId,
input.taskType,
input.resourceKey
);
if (existingTask) {
return await reconcileTaskWithWorkflow(existingTask);
}
throw new Error('Task deduplication conflict detected but no in-flight task found');
}
export async function findInFlightTask(userId: string, taskType: TaskType, resourceKey: string) {
const task = await findInFlightTaskByResourceKey(userId, taskType, resourceKey);
if (!task) {
return null;
}
return await reconcileTaskWithWorkflow(task);
}
export async function getTaskById(taskId: string, userId: string) {
const task = await getTaskByIdForUser(taskId, userId);
if (!task) {
return null;
}
return await reconcileTaskWithWorkflow(task);
}
export async function listRecentTasks(userId: string, limit = 20, statuses?: TaskStatus[]) {
const tasks = await listRecentTasksForUser(userId, limit, statuses);
return await Promise.all(tasks.map((task) => reconcileTaskWithWorkflow(task)));
}
export async function updateTaskNotification(
userId: string,
taskId: string,
input: UpdateTaskNotificationInput
) {
const task = await updateTaskNotificationState(taskId, userId, input);
if (!task) {
return null;
}
return await reconcileTaskWithWorkflow(task);
}
export async function getTaskTimeline(taskId: string, userId: string): Promise<TaskTimeline | null> {
const task = await getTaskById(taskId, userId);
if (!task) {
return null;
}
const events = await listTaskStageEventsForTask(taskId, userId);
return {
task,
events
};
}
export async function getTaskQueueSnapshot() {
return await countTasksByStatus();
}