import { and, asc, desc, eq, inArray, sql } from 'drizzle-orm'; import type { Task, TaskStage, TaskStageContext, TaskStageEvent, TaskStatus, TaskType } from '@/lib/types'; import { db } from '@/lib/server/db'; import { taskRun, taskStageEvent } from '@/lib/server/db/schema'; import { buildTaskNotification } from '@/lib/server/task-notifications'; type TaskRow = typeof taskRun.$inferSelect; type TaskStageEventRow = typeof taskStageEvent.$inferSelect; type CreateTaskInput = { id: string; user_id: string; task_type: TaskType; payload: Record; priority: number; max_attempts: number; resource_key?: string | null; }; type UpdateTaskNotificationStateInput = { read?: boolean; silenced?: boolean; }; type EventInsertInput = { task_id: string; user_id: string; stage: TaskStage; stage_detail: string | null; stage_context: TaskStageContext | null; status: TaskStatus; created_at: string; }; type TaskCompletionState = { detail?: string | null; context?: TaskStageContext | null; }; type InsertExecutor = Pick; function toTask(row: TaskRow): Task { const task = { id: row.id, user_id: row.user_id, task_type: row.task_type, status: row.status, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, resource_key: row.resource_key, notification_read_at: row.notification_read_at, notification_silenced_at: row.notification_silenced_at, priority: row.priority, payload: row.payload, result: row.result, error: row.error, attempts: row.attempts, max_attempts: row.max_attempts, workflow_run_id: row.workflow_run_id, created_at: row.created_at, updated_at: row.updated_at, finished_at: row.finished_at } satisfies Omit; return { ...task, notification: buildTaskNotification(task) }; } function toTaskStageEvent(row: TaskStageEventRow): TaskStageEvent { return { id: row.id, task_id: row.task_id, user_id: row.user_id, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, status: row.status as TaskStatus, created_at: row.created_at }; } function statusToStage(status: TaskStatus): TaskStage { switch (status) { case 'queued': return 'queued'; case 'running': return 'running'; case 'completed': return 'completed'; case 'failed': return 'failed'; default: return 'failed'; } } async function insertTaskStageEvent(executor: InsertExecutor, input: EventInsertInput) { await executor.insert(taskStageEvent).values({ task_id: input.task_id, user_id: input.user_id, stage: input.stage, stage_detail: input.stage_detail, stage_context: input.stage_context, status: input.status, created_at: input.created_at }); } export async function createTaskRunRecord(input: CreateTaskInput) { const now = new Date().toISOString(); return await db.transaction(async (tx) => { const [row] = await tx .insert(taskRun) .values({ id: input.id, user_id: input.user_id, task_type: input.task_type, status: 'queued', stage: 'queued', stage_detail: null, stage_context: null, resource_key: input.resource_key ?? null, notification_read_at: null, notification_silenced_at: null, priority: input.priority, payload: input.payload, result: null, error: null, attempts: 0, max_attempts: input.max_attempts, workflow_run_id: null, created_at: now, updated_at: now, finished_at: null }) .returning(); await insertTaskStageEvent(tx, { task_id: row.id, user_id: row.user_id, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, status: row.status, created_at: now }); return toTask(row); }); } export async function setTaskWorkflowRunId(taskId: string, workflowRunId: string) { await db .update(taskRun) .set({ workflow_run_id: workflowRunId, updated_at: new Date().toISOString() }) .where(eq(taskRun.id, taskId)); } export async function getTaskByIdForUser(taskId: string, userId: string) { const [row] = await db .select() .from(taskRun) .where(and(eq(taskRun.id, taskId), eq(taskRun.user_id, userId))) .limit(1); return row ? toTask(row) : null; } export async function listRecentTasksForUser( userId: string, limit = 20, statuses?: TaskStatus[] ) { const safeLimit = Math.min(Math.max(Math.trunc(limit), 1), 200); const rows = statuses && statuses.length > 0 ? await db .select() .from(taskRun) .where(and(eq(taskRun.user_id, userId), inArray(taskRun.status, statuses))) .orderBy(desc(taskRun.updated_at), desc(taskRun.created_at)) .limit(safeLimit) : await db .select() .from(taskRun) .where(eq(taskRun.user_id, userId)) .orderBy(desc(taskRun.updated_at), desc(taskRun.created_at)) .limit(safeLimit); return rows.map(toTask); } export async function countTasksByStatus() { const rows = await db .select({ status: taskRun.status, count: sql`count(*)` }) .from(taskRun) .groupBy(taskRun.status); const queue: Record = {}; for (const row of rows) { queue[row.status] = Number(row.count); } return queue; } export async function findInFlightTaskByResourceKey( userId: string, taskType: TaskType, resourceKey: string ) { const [row] = await db .select() .from(taskRun) .where(and( eq(taskRun.user_id, userId), eq(taskRun.task_type, taskType), eq(taskRun.resource_key, resourceKey), inArray(taskRun.status, ['queued', 'running']) )) .orderBy(desc(taskRun.updated_at), desc(taskRun.created_at)) .limit(1); return row ? toTask(row) : null; } export async function markTaskRunning(taskId: string) { const now = new Date().toISOString(); return await db.transaction(async (tx) => { const [row] = await tx .update(taskRun) .set({ status: 'running', stage: 'running', stage_detail: 'Workflow task is now running', stage_context: null, attempts: sql`${taskRun.attempts} + 1`, updated_at: now, finished_at: null }) .where(eq(taskRun.id, taskId)) .returning(); if (!row) { return null; } await insertTaskStageEvent(tx, { task_id: row.id, user_id: row.user_id, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, status: row.status, created_at: now }); return toTask(row); }); } export async function updateTaskStage( taskId: string, stage: TaskStage, detail: string | null = null, context: TaskStageContext | null = null ) { const now = new Date().toISOString(); return await db.transaction(async (tx) => { const [current] = await tx .select() .from(taskRun) .where(eq(taskRun.id, taskId)) .limit(1); if (!current) { return null; } const [row] = await tx .update(taskRun) .set({ stage, stage_detail: detail, stage_context: context, updated_at: now }) .where(eq(taskRun.id, taskId)) .returning(); if (!row) { return null; } if (current.stage !== stage) { await insertTaskStageEvent(tx, { task_id: row.id, user_id: row.user_id, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, status: row.status, created_at: now }); } return toTask(row); }); } export async function completeTask( taskId: string, result: Record, completion: TaskCompletionState = {} ) { const now = new Date().toISOString(); return await db.transaction(async (tx) => { const [row] = await tx .update(taskRun) .set({ status: 'completed', stage: 'completed', stage_detail: completion.detail ?? 'Task finished successfully.', stage_context: completion.context ?? null, result, error: null, updated_at: now, finished_at: now }) .where(eq(taskRun.id, taskId)) .returning(); if (!row) { return null; } await insertTaskStageEvent(tx, { task_id: row.id, user_id: row.user_id, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, status: row.status, created_at: now }); return toTask(row); }); } export async function markTaskFailure( taskId: string, reason: string, stage: TaskStage = 'failed', failure: TaskCompletionState = {} ) { const now = new Date().toISOString(); return await db.transaction(async (tx) => { const [row] = await tx .update(taskRun) .set({ status: 'failed', stage, stage_detail: failure.detail ?? reason, stage_context: failure.context ?? null, error: reason, updated_at: now, finished_at: now }) .where(eq(taskRun.id, taskId)) .returning(); if (!row) { return null; } await insertTaskStageEvent(tx, { task_id: row.id, user_id: row.user_id, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, status: row.status, created_at: now }); return toTask(row); }); } export async function setTaskStatusFromWorkflow( taskId: string, status: TaskStatus, error?: string | null, detail?: string | null ) { const isTerminal = status === 'completed' || status === 'failed'; const nextStage = statusToStage(status); const nextError = status === 'failed' ? (error ?? 'Workflow run failed') : null; const nextDetail = nextStatusDetail(status, nextError, detail); return await db.transaction(async (tx) => { const [current] = await tx .select() .from(taskRun) .where(eq(taskRun.id, taskId)) .limit(1); if (!current) { return null; } const hasNoStateChange = current.status === status && current.stage === nextStage && (current.error ?? null) === nextError && (current.stage_detail ?? null) === (nextDetail ?? null) && (current.stage_context ?? null) === null && (isTerminal ? current.finished_at !== null : current.finished_at === null); if (hasNoStateChange) { return toTask(current); } const now = new Date().toISOString(); const [row] = await tx .update(taskRun) .set({ status, stage: nextStage, stage_detail: nextDetail, stage_context: null, error: nextError, updated_at: now, finished_at: isTerminal ? now : null }) .where(eq(taskRun.id, taskId)) .returning(); if (!row) { return null; } await insertTaskStageEvent(tx, { task_id: row.id, user_id: row.user_id, stage: row.stage as TaskStage, stage_detail: row.stage_detail, stage_context: row.stage_context ?? null, status: row.status, created_at: now }); return toTask(row); }); } export async function updateTaskNotificationState( taskId: string, userId: string, input: UpdateTaskNotificationStateInput ) { const now = new Date().toISOString(); const patch: Partial = { updated_at: now }; let hasMutation = false; if (typeof input.read === 'boolean') { patch.notification_read_at = input.read ? now : null; hasMutation = true; } if (typeof input.silenced === 'boolean') { patch.notification_silenced_at = input.silenced ? now : null; hasMutation = true; if (input.silenced) { patch.notification_read_at = now; } } if (!hasMutation) { return await getTaskByIdForUser(taskId, userId); } const [row] = await db .update(taskRun) .set(patch) .where(and(eq(taskRun.id, taskId), eq(taskRun.user_id, userId))) .returning(); return row ? toTask(row) : null; } function nextStatusDetail(status: TaskStatus, error?: string | null, detail?: string | null) { if (status === 'failed') { return detail ?? error ?? 'Workflow run failed'; } if (status === 'completed') { return 'Workflow run completed.'; } if (status === 'running') { return 'Workflow task is now running'; } return null; } export async function listTaskStageEventsForTask(taskId: string, userId: string) { const rows = await db .select() .from(taskStageEvent) .where(and(eq(taskStageEvent.task_id, taskId), eq(taskStageEvent.user_id, userId))) .orderBy(asc(taskStageEvent.created_at), asc(taskStageEvent.id)); return rows.map(toTaskStageEvent); } export async function getTaskById(taskId: string) { const [row] = await db .select() .from(taskRun) .where(eq(taskRun.id, taskId)) .limit(1); return row ? toTask(row) : null; }