173 lines
4.3 KiB
TypeScript
173 lines
4.3 KiB
TypeScript
import { randomUUID } from 'node:crypto';
|
|
import { getRun, start } from 'workflow/api';
|
|
import type { WorkflowRunStatus } from '@workflow/world';
|
|
import type { Task, TaskStatus, TaskTimeline, TaskType } from '@/lib/types';
|
|
import { runTaskWorkflow } from '@/app/workflows/task-runner';
|
|
import {
|
|
countTasksByStatus,
|
|
createTaskRunRecord,
|
|
findInFlightTaskByResourceKey,
|
|
getTaskByIdForUser,
|
|
listTaskStageEventsForTask,
|
|
listRecentTasksForUser,
|
|
markTaskFailure,
|
|
setTaskStatusFromWorkflow,
|
|
setTaskWorkflowRunId,
|
|
updateTaskNotificationState
|
|
} from '@/lib/server/repos/tasks';
|
|
|
|
type EnqueueTaskInput = {
|
|
userId: string;
|
|
taskType: TaskType;
|
|
payload?: Record<string, unknown>;
|
|
priority?: number;
|
|
maxAttempts?: number;
|
|
resourceKey?: string;
|
|
};
|
|
|
|
type UpdateTaskNotificationInput = {
|
|
read?: boolean;
|
|
silenced?: boolean;
|
|
};
|
|
|
|
function mapWorkflowStatus(status: WorkflowRunStatus): TaskStatus {
|
|
switch (status) {
|
|
case 'pending':
|
|
return 'queued';
|
|
case 'running':
|
|
return 'running';
|
|
case 'completed':
|
|
return 'completed';
|
|
case 'failed':
|
|
case 'cancelled':
|
|
return 'failed';
|
|
default:
|
|
return 'failed';
|
|
}
|
|
}
|
|
|
|
function isProjectionPendingSync(task: Task) {
|
|
return task.status === 'queued' || task.status === 'running';
|
|
}
|
|
|
|
async function reconcileTaskWithWorkflow(task: Task) {
|
|
if (!task.workflow_run_id || !isProjectionPendingSync(task)) {
|
|
return task;
|
|
}
|
|
|
|
try {
|
|
const run = getRun(task.workflow_run_id);
|
|
const workflowStatus = await run.status;
|
|
const nextStatus = mapWorkflowStatus(workflowStatus);
|
|
|
|
if (nextStatus === task.status) {
|
|
return task;
|
|
}
|
|
|
|
const nextError = nextStatus === 'failed'
|
|
? workflowStatus === 'cancelled'
|
|
? 'Workflow run cancelled'
|
|
: 'Workflow run failed'
|
|
: null;
|
|
|
|
const updated = await setTaskStatusFromWorkflow(task.id, nextStatus, nextError);
|
|
|
|
return updated ?? {
|
|
...task,
|
|
status: nextStatus,
|
|
stage: nextStatus,
|
|
stage_detail: null,
|
|
error: nextError,
|
|
finished_at: nextStatus === 'queued' || nextStatus === 'running'
|
|
? null
|
|
: task.finished_at ?? new Date().toISOString()
|
|
};
|
|
} catch {
|
|
return task;
|
|
}
|
|
}
|
|
|
|
export async function enqueueTask(input: EnqueueTaskInput) {
|
|
const task = await createTaskRunRecord({
|
|
id: randomUUID(),
|
|
user_id: input.userId,
|
|
task_type: input.taskType,
|
|
payload: input.payload ?? {},
|
|
priority: input.priority ?? 50,
|
|
max_attempts: input.maxAttempts ?? 3,
|
|
resource_key: input.resourceKey ?? null
|
|
});
|
|
|
|
try {
|
|
const run = await start(runTaskWorkflow, [task.id]);
|
|
await setTaskWorkflowRunId(task.id, run.runId);
|
|
|
|
return {
|
|
...task,
|
|
workflow_run_id: run.runId
|
|
} satisfies Task;
|
|
} catch (error) {
|
|
const reason = error instanceof Error
|
|
? error.message
|
|
: 'Failed to start workflow';
|
|
await markTaskFailure(task.id, reason, 'failed');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
export async function findInFlightTask(userId: string, taskType: TaskType, resourceKey: string) {
|
|
const task = await findInFlightTaskByResourceKey(userId, taskType, resourceKey);
|
|
|
|
if (!task) {
|
|
return null;
|
|
}
|
|
|
|
return await reconcileTaskWithWorkflow(task);
|
|
}
|
|
|
|
export async function getTaskById(taskId: string, userId: string) {
|
|
const task = await getTaskByIdForUser(taskId, userId);
|
|
|
|
if (!task) {
|
|
return null;
|
|
}
|
|
|
|
return await reconcileTaskWithWorkflow(task);
|
|
}
|
|
|
|
export async function listRecentTasks(userId: string, limit = 20, statuses?: TaskStatus[]) {
|
|
const tasks = await listRecentTasksForUser(userId, limit, statuses);
|
|
return await Promise.all(tasks.map((task) => reconcileTaskWithWorkflow(task)));
|
|
}
|
|
|
|
export async function updateTaskNotification(
|
|
userId: string,
|
|
taskId: string,
|
|
input: UpdateTaskNotificationInput
|
|
) {
|
|
const task = await updateTaskNotificationState(taskId, userId, input);
|
|
if (!task) {
|
|
return null;
|
|
}
|
|
|
|
return await reconcileTaskWithWorkflow(task);
|
|
}
|
|
|
|
export async function getTaskTimeline(taskId: string, userId: string): Promise<TaskTimeline | null> {
|
|
const task = await getTaskById(taskId, userId);
|
|
if (!task) {
|
|
return null;
|
|
}
|
|
|
|
const events = await listTaskStageEventsForTask(taskId, userId);
|
|
|
|
return {
|
|
task,
|
|
events
|
|
};
|
|
}
|
|
|
|
export async function getTaskQueueSnapshot() {
|
|
return await countTasksByStatus();
|
|
}
|