- Add partial unique index for active resource-scoped tasks - Implement createTaskRunRecordAtomic for race-free task creation - Update findOrEnqueueTask to use atomic insert first - Add tests for concurrent task creation deduplication
623 lines
15 KiB
TypeScript
623 lines
15 KiB
TypeScript
import { and, asc, desc, eq, inArray, sql } from 'drizzle-orm';
|
|
import type { Task, TaskStage, TaskStageContext, TaskStageEvent, TaskStatus, TaskType } from '@/lib/types';
|
|
import { db } from '@/lib/server/db';
|
|
import { taskRun, taskStageEvent } from '@/lib/server/db/schema';
|
|
import { buildTaskNotification } from '@/lib/server/task-notifications';
|
|
|
|
type TaskRow = typeof taskRun.$inferSelect;
|
|
type TaskStageEventRow = typeof taskStageEvent.$inferSelect;
|
|
|
|
type CreateTaskInput = {
|
|
id: string;
|
|
user_id: string;
|
|
task_type: TaskType;
|
|
payload: Record<string, unknown>;
|
|
priority: number;
|
|
max_attempts: number;
|
|
resource_key?: string | null;
|
|
};
|
|
|
|
type UpdateTaskNotificationStateInput = {
|
|
read?: boolean;
|
|
silenced?: boolean;
|
|
};
|
|
|
|
type EventInsertInput = {
|
|
task_id: string;
|
|
user_id: string;
|
|
stage: TaskStage;
|
|
stage_detail: string | null;
|
|
stage_context: TaskStageContext | null;
|
|
status: TaskStatus;
|
|
created_at: string;
|
|
};
|
|
|
|
type TaskCompletionState = {
|
|
detail?: string | null;
|
|
context?: TaskStageContext | null;
|
|
};
|
|
|
|
type InsertExecutor = Pick<typeof db, 'insert'>;
|
|
|
|
function toTask(row: TaskRow): Task {
|
|
const task = {
|
|
id: row.id,
|
|
user_id: row.user_id,
|
|
task_type: row.task_type,
|
|
status: row.status,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
resource_key: row.resource_key,
|
|
notification_read_at: row.notification_read_at,
|
|
notification_silenced_at: row.notification_silenced_at,
|
|
priority: row.priority,
|
|
payload: row.payload,
|
|
result: row.result,
|
|
error: row.error,
|
|
attempts: row.attempts,
|
|
max_attempts: row.max_attempts,
|
|
workflow_run_id: row.workflow_run_id,
|
|
created_at: row.created_at,
|
|
updated_at: row.updated_at,
|
|
finished_at: row.finished_at
|
|
} satisfies Omit<Task, 'notification'>;
|
|
|
|
return {
|
|
...task,
|
|
notification: buildTaskNotification(task)
|
|
};
|
|
}
|
|
|
|
function toTaskStageEvent(row: TaskStageEventRow): TaskStageEvent {
|
|
return {
|
|
id: row.id,
|
|
task_id: row.task_id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status as TaskStatus,
|
|
created_at: row.created_at
|
|
};
|
|
}
|
|
|
|
function statusToStage(status: TaskStatus): TaskStage {
|
|
switch (status) {
|
|
case 'queued':
|
|
return 'queued';
|
|
case 'running':
|
|
return 'running';
|
|
case 'completed':
|
|
return 'completed';
|
|
case 'failed':
|
|
return 'failed';
|
|
default:
|
|
return 'failed';
|
|
}
|
|
}
|
|
|
|
async function insertTaskStageEvent(executor: InsertExecutor, input: EventInsertInput) {
|
|
await executor.insert(taskStageEvent).values({
|
|
task_id: input.task_id,
|
|
user_id: input.user_id,
|
|
stage: input.stage,
|
|
stage_detail: input.stage_detail,
|
|
stage_context: input.stage_context,
|
|
status: input.status,
|
|
created_at: input.created_at
|
|
});
|
|
}
|
|
|
|
export async function createTaskRunRecord(input: CreateTaskInput) {
|
|
const now = new Date().toISOString();
|
|
|
|
return await db.transaction(async (tx) => {
|
|
const [row] = await tx
|
|
.insert(taskRun)
|
|
.values({
|
|
id: input.id,
|
|
user_id: input.user_id,
|
|
task_type: input.task_type,
|
|
status: 'queued',
|
|
stage: 'queued',
|
|
stage_detail: null,
|
|
stage_context: null,
|
|
resource_key: input.resource_key ?? null,
|
|
notification_read_at: null,
|
|
notification_silenced_at: null,
|
|
priority: input.priority,
|
|
payload: input.payload,
|
|
result: null,
|
|
error: null,
|
|
attempts: 0,
|
|
max_attempts: input.max_attempts,
|
|
workflow_run_id: null,
|
|
created_at: now,
|
|
updated_at: now,
|
|
finished_at: null
|
|
})
|
|
.returning();
|
|
|
|
await insertTaskStageEvent(tx, {
|
|
task_id: row.id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status,
|
|
created_at: now
|
|
});
|
|
|
|
return toTask(row);
|
|
});
|
|
}
|
|
|
|
export type AtomicCreateResult =
|
|
| { task: Task; created: true }
|
|
| { task: null; created: false };
|
|
|
|
const SQLITE_CONSTRAINT_UNIQUE = 2067;
|
|
|
|
async function attemptAtomicInsert(
|
|
tx: Parameters<Parameters<typeof db.transaction>[0]>[0],
|
|
input: CreateTaskInput,
|
|
now: string
|
|
): Promise<{ task: Task; created: true } | null> {
|
|
try {
|
|
const [row] = await tx
|
|
.insert(taskRun)
|
|
.values({
|
|
id: input.id,
|
|
user_id: input.user_id,
|
|
task_type: input.task_type,
|
|
status: 'queued',
|
|
stage: 'queued',
|
|
stage_detail: null,
|
|
stage_context: null,
|
|
resource_key: input.resource_key ?? null,
|
|
notification_read_at: null,
|
|
notification_silenced_at: null,
|
|
priority: input.priority,
|
|
payload: input.payload,
|
|
result: null,
|
|
error: null,
|
|
attempts: 0,
|
|
max_attempts: input.max_attempts,
|
|
workflow_run_id: null,
|
|
created_at: now,
|
|
updated_at: now,
|
|
finished_at: null
|
|
})
|
|
.returning();
|
|
|
|
if (!row) {
|
|
return null;
|
|
}
|
|
|
|
await insertTaskStageEvent(tx, {
|
|
task_id: row.id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status,
|
|
created_at: now
|
|
});
|
|
|
|
return { task: toTask(row), created: true };
|
|
} catch (error) {
|
|
const sqliteError = error as { code?: number; message?: string };
|
|
if (
|
|
sqliteError.code === SQLITE_CONSTRAINT_UNIQUE ||
|
|
sqliteError.message?.includes('UNIQUE constraint failed')
|
|
) {
|
|
return null;
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
export async function createTaskRunRecordAtomic(input: CreateTaskInput): Promise<AtomicCreateResult> {
|
|
const now = new Date().toISOString();
|
|
|
|
return await db.transaction(async (tx) => {
|
|
const result = await attemptAtomicInsert(tx, input, now);
|
|
if (result) {
|
|
return result;
|
|
}
|
|
return { task: null, created: false };
|
|
});
|
|
}
|
|
|
|
export async function setTaskWorkflowRunId(taskId: string, workflowRunId: string) {
|
|
await db
|
|
.update(taskRun)
|
|
.set({
|
|
workflow_run_id: workflowRunId,
|
|
updated_at: new Date().toISOString()
|
|
})
|
|
.where(eq(taskRun.id, taskId));
|
|
}
|
|
|
|
export async function getTaskByIdForUser(taskId: string, userId: string) {
|
|
const [row] = await db
|
|
.select()
|
|
.from(taskRun)
|
|
.where(and(eq(taskRun.id, taskId), eq(taskRun.user_id, userId)))
|
|
.limit(1);
|
|
|
|
return row ? toTask(row) : null;
|
|
}
|
|
|
|
export async function listRecentTasksForUser(
|
|
userId: string,
|
|
limit = 20,
|
|
statuses?: TaskStatus[]
|
|
) {
|
|
const safeLimit = Math.min(Math.max(Math.trunc(limit), 1), 200);
|
|
|
|
const rows = statuses && statuses.length > 0
|
|
? await db
|
|
.select()
|
|
.from(taskRun)
|
|
.where(and(eq(taskRun.user_id, userId), inArray(taskRun.status, statuses)))
|
|
.orderBy(desc(taskRun.updated_at), desc(taskRun.created_at))
|
|
.limit(safeLimit)
|
|
: await db
|
|
.select()
|
|
.from(taskRun)
|
|
.where(eq(taskRun.user_id, userId))
|
|
.orderBy(desc(taskRun.updated_at), desc(taskRun.created_at))
|
|
.limit(safeLimit);
|
|
|
|
return rows.map(toTask);
|
|
}
|
|
|
|
export async function countTasksByStatus() {
|
|
const rows = await db
|
|
.select({
|
|
status: taskRun.status,
|
|
count: sql<string>`count(*)`
|
|
})
|
|
.from(taskRun)
|
|
.groupBy(taskRun.status);
|
|
|
|
const queue: Record<string, number> = {};
|
|
|
|
for (const row of rows) {
|
|
queue[row.status] = Number(row.count);
|
|
}
|
|
|
|
return queue;
|
|
}
|
|
|
|
export async function findInFlightTaskByResourceKey(
|
|
userId: string,
|
|
taskType: TaskType,
|
|
resourceKey: string
|
|
) {
|
|
const [row] = await db
|
|
.select()
|
|
.from(taskRun)
|
|
.where(and(
|
|
eq(taskRun.user_id, userId),
|
|
eq(taskRun.task_type, taskType),
|
|
eq(taskRun.resource_key, resourceKey),
|
|
inArray(taskRun.status, ['queued', 'running'])
|
|
))
|
|
.orderBy(desc(taskRun.updated_at), desc(taskRun.created_at))
|
|
.limit(1);
|
|
|
|
return row ? toTask(row) : null;
|
|
}
|
|
|
|
export async function markTaskRunning(taskId: string) {
|
|
const now = new Date().toISOString();
|
|
|
|
return await db.transaction(async (tx) => {
|
|
const [row] = await tx
|
|
.update(taskRun)
|
|
.set({
|
|
status: 'running',
|
|
stage: 'running',
|
|
stage_detail: 'Workflow task is now running',
|
|
stage_context: null,
|
|
attempts: sql`${taskRun.attempts} + 1`,
|
|
updated_at: now,
|
|
finished_at: null
|
|
})
|
|
.where(eq(taskRun.id, taskId))
|
|
.returning();
|
|
|
|
if (!row) {
|
|
return null;
|
|
}
|
|
|
|
await insertTaskStageEvent(tx, {
|
|
task_id: row.id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status,
|
|
created_at: now
|
|
});
|
|
|
|
return toTask(row);
|
|
});
|
|
}
|
|
|
|
export async function updateTaskStage(
|
|
taskId: string,
|
|
stage: TaskStage,
|
|
detail: string | null = null,
|
|
context: TaskStageContext | null = null
|
|
) {
|
|
const now = new Date().toISOString();
|
|
|
|
return await db.transaction(async (tx) => {
|
|
const [current] = await tx
|
|
.select()
|
|
.from(taskRun)
|
|
.where(eq(taskRun.id, taskId))
|
|
.limit(1);
|
|
|
|
if (!current) {
|
|
return null;
|
|
}
|
|
|
|
const [row] = await tx
|
|
.update(taskRun)
|
|
.set({
|
|
stage,
|
|
stage_detail: detail,
|
|
stage_context: context,
|
|
updated_at: now
|
|
})
|
|
.where(eq(taskRun.id, taskId))
|
|
.returning();
|
|
|
|
if (!row) {
|
|
return null;
|
|
}
|
|
|
|
if (current.stage !== stage) {
|
|
await insertTaskStageEvent(tx, {
|
|
task_id: row.id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status,
|
|
created_at: now
|
|
});
|
|
}
|
|
|
|
return toTask(row);
|
|
});
|
|
}
|
|
|
|
export async function completeTask(
|
|
taskId: string,
|
|
result: Record<string, unknown>,
|
|
completion: TaskCompletionState = {}
|
|
) {
|
|
const now = new Date().toISOString();
|
|
|
|
return await db.transaction(async (tx) => {
|
|
const [row] = await tx
|
|
.update(taskRun)
|
|
.set({
|
|
status: 'completed',
|
|
stage: 'completed',
|
|
stage_detail: completion.detail ?? 'Task finished successfully.',
|
|
stage_context: completion.context ?? null,
|
|
result,
|
|
error: null,
|
|
updated_at: now,
|
|
finished_at: now
|
|
})
|
|
.where(eq(taskRun.id, taskId))
|
|
.returning();
|
|
|
|
if (!row) {
|
|
return null;
|
|
}
|
|
|
|
await insertTaskStageEvent(tx, {
|
|
task_id: row.id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status,
|
|
created_at: now
|
|
});
|
|
|
|
return toTask(row);
|
|
});
|
|
}
|
|
|
|
export async function markTaskFailure(
|
|
taskId: string,
|
|
reason: string,
|
|
stage: TaskStage = 'failed',
|
|
failure: TaskCompletionState = {}
|
|
) {
|
|
const now = new Date().toISOString();
|
|
|
|
return await db.transaction(async (tx) => {
|
|
const [row] = await tx
|
|
.update(taskRun)
|
|
.set({
|
|
status: 'failed',
|
|
stage,
|
|
stage_detail: failure.detail ?? reason,
|
|
stage_context: failure.context ?? null,
|
|
error: reason,
|
|
updated_at: now,
|
|
finished_at: now
|
|
})
|
|
.where(eq(taskRun.id, taskId))
|
|
.returning();
|
|
|
|
if (!row) {
|
|
return null;
|
|
}
|
|
|
|
await insertTaskStageEvent(tx, {
|
|
task_id: row.id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status,
|
|
created_at: now
|
|
});
|
|
|
|
return toTask(row);
|
|
});
|
|
}
|
|
|
|
export async function setTaskStatusFromWorkflow(
|
|
taskId: string,
|
|
status: TaskStatus,
|
|
error?: string | null,
|
|
detail?: string | null
|
|
) {
|
|
const isTerminal = status === 'completed' || status === 'failed';
|
|
const nextStage = statusToStage(status);
|
|
const nextError = status === 'failed' ? (error ?? 'Workflow run failed') : null;
|
|
const nextDetail = nextStatusDetail(status, nextError, detail);
|
|
|
|
return await db.transaction(async (tx) => {
|
|
const [current] = await tx
|
|
.select()
|
|
.from(taskRun)
|
|
.where(eq(taskRun.id, taskId))
|
|
.limit(1);
|
|
|
|
if (!current) {
|
|
return null;
|
|
}
|
|
|
|
const hasNoStateChange = current.status === status
|
|
&& current.stage === nextStage
|
|
&& (current.error ?? null) === nextError
|
|
&& (current.stage_detail ?? null) === (nextDetail ?? null)
|
|
&& (current.stage_context ?? null) === null
|
|
&& (isTerminal ? current.finished_at !== null : current.finished_at === null);
|
|
|
|
if (hasNoStateChange) {
|
|
return toTask(current);
|
|
}
|
|
|
|
const now = new Date().toISOString();
|
|
const [row] = await tx
|
|
.update(taskRun)
|
|
.set({
|
|
status,
|
|
stage: nextStage,
|
|
stage_detail: nextDetail,
|
|
stage_context: null,
|
|
error: nextError,
|
|
updated_at: now,
|
|
finished_at: isTerminal ? now : null
|
|
})
|
|
.where(eq(taskRun.id, taskId))
|
|
.returning();
|
|
|
|
if (!row) {
|
|
return null;
|
|
}
|
|
|
|
await insertTaskStageEvent(tx, {
|
|
task_id: row.id,
|
|
user_id: row.user_id,
|
|
stage: row.stage as TaskStage,
|
|
stage_detail: row.stage_detail,
|
|
stage_context: row.stage_context ?? null,
|
|
status: row.status,
|
|
created_at: now
|
|
});
|
|
|
|
return toTask(row);
|
|
});
|
|
}
|
|
|
|
export async function updateTaskNotificationState(
|
|
taskId: string,
|
|
userId: string,
|
|
input: UpdateTaskNotificationStateInput
|
|
) {
|
|
const now = new Date().toISOString();
|
|
const patch: Partial<typeof taskRun.$inferInsert> = {
|
|
updated_at: now
|
|
};
|
|
|
|
let hasMutation = false;
|
|
|
|
if (typeof input.read === 'boolean') {
|
|
patch.notification_read_at = input.read ? now : null;
|
|
hasMutation = true;
|
|
}
|
|
|
|
if (typeof input.silenced === 'boolean') {
|
|
patch.notification_silenced_at = input.silenced ? now : null;
|
|
hasMutation = true;
|
|
|
|
if (input.silenced) {
|
|
patch.notification_read_at = now;
|
|
}
|
|
}
|
|
|
|
if (!hasMutation) {
|
|
return await getTaskByIdForUser(taskId, userId);
|
|
}
|
|
|
|
const [row] = await db
|
|
.update(taskRun)
|
|
.set(patch)
|
|
.where(and(eq(taskRun.id, taskId), eq(taskRun.user_id, userId)))
|
|
.returning();
|
|
|
|
return row ? toTask(row) : null;
|
|
}
|
|
|
|
function nextStatusDetail(status: TaskStatus, error?: string | null, detail?: string | null) {
|
|
if (status === 'failed') {
|
|
return detail ?? error ?? 'Workflow run failed';
|
|
}
|
|
|
|
if (status === 'completed') {
|
|
return 'Workflow run completed.';
|
|
}
|
|
|
|
if (status === 'running') {
|
|
return 'Workflow task is now running';
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
export async function listTaskStageEventsForTask(taskId: string, userId: string) {
|
|
const rows = await db
|
|
.select()
|
|
.from(taskStageEvent)
|
|
.where(and(eq(taskStageEvent.task_id, taskId), eq(taskStageEvent.user_id, userId)))
|
|
.orderBy(asc(taskStageEvent.created_at), asc(taskStageEvent.id));
|
|
|
|
return rows.map(toTaskStageEvent);
|
|
}
|
|
|
|
export async function getTaskById(taskId: string) {
|
|
const [row] = await db
|
|
.select()
|
|
.from(taskRun)
|
|
.where(eq(taskRun.id, taskId))
|
|
.limit(1);
|
|
|
|
return row ? toTask(row) : null;
|
|
}
|