From ed4420b8db19bf9ee39e11277bc5b09db0931269 Mon Sep 17 00:00:00 2001 From: francy51 Date: Sun, 15 Mar 2026 14:40:38 -0400 Subject: [PATCH] Add atomic task deduplication with partial unique index - Add partial unique index for active resource-scoped tasks - Implement createTaskRunRecordAtomic for race-free task creation - Update findOrEnqueueTask to use atomic insert first - Add tests for concurrent task creation deduplication --- drizzle/0013_task_active_resource_unique.sql | 3 + lib/server/db/schema.ts | 6 ++ lib/server/repos/tasks.test.ts | 105 ++++++++++++++++++- lib/server/repos/tasks.ts | 77 ++++++++++++++ lib/server/tasks.ts | 40 ++++++- 5 files changed, 225 insertions(+), 6 deletions(-) create mode 100644 drizzle/0013_task_active_resource_unique.sql diff --git a/drizzle/0013_task_active_resource_unique.sql b/drizzle/0013_task_active_resource_unique.sql new file mode 100644 index 0000000..81afc75 --- /dev/null +++ b/drizzle/0013_task_active_resource_unique.sql @@ -0,0 +1,3 @@ +CREATE UNIQUE INDEX IF NOT EXISTS `task_active_resource_uidx` +ON `task_run` (`user_id`, `task_type`, `resource_key`) +WHERE `resource_key` IS NOT NULL AND `status` IN ('queued', 'running'); diff --git a/lib/server/db/schema.ts b/lib/server/db/schema.ts index 44b4e1f..6189d9d 100644 --- a/lib/server/db/schema.ts +++ b/lib/server/db/schema.ts @@ -668,6 +668,12 @@ export const taskRun = sqliteTable('task_run', { taskWorkflowRunUnique: uniqueIndex('task_workflow_run_uidx').on(table.workflow_run_id) })); +// Note: Partial unique index for active resource-scoped task deduplication is created via +// migration 0013_task_active_resource_unique.sql. SQLite does not support partial indexes +// in drizzle schema DSL, so the index is managed separately: +// CREATE UNIQUE INDEX task_active_resource_uidx ON task_run (user_id, task_type, resource_key) +// WHERE resource_key IS NOT NULL AND status IN ('queued', 'running'); + export const taskStageEvent = sqliteTable('task_stage_event', { id: integer('id').primaryKey({ autoIncrement: true }), task_id: text('task_id').notNull().references(() => taskRun.id, { onDelete: 'cascade' }), diff --git a/lib/server/repos/tasks.test.ts b/lib/server/repos/tasks.test.ts index e1c1b32..c8ac581 100644 --- a/lib/server/repos/tasks.test.ts +++ b/lib/server/repos/tasks.test.ts @@ -68,7 +68,8 @@ describe('task repos', () => { '0007_company_financial_bundles.sql', '0008_research_workspace.sql', '0009_task_notification_context.sql', - '0012_company_overview_cache.sql' + '0012_company_overview_cache.sql', + '0013_task_active_resource_unique.sql' ]) { applyMigration(sqliteClient, file); } @@ -222,4 +223,106 @@ describe('task repos', () => { expect(failed?.error).toContain('Search indexing could not generate embeddings'); expect(failed?.stage_context?.progress?.current).toBe(2); }); + + it('atomically deduplicates concurrent task creation for same resource', async () => { + if (!tasksRepo) { + throw new Error('tasks repo not initialized'); + } + + const resourceKey = 'sync_filings:AAPL'; + const concurrentCount = 10; + + const results = await Promise.all( + Array.from({ length: concurrentCount }, (_, i) => + tasksRepo!.createTaskRunRecordAtomic({ + id: `task-race-${i}`, + user_id: TEST_USER_ID, + task_type: 'sync_filings', + payload: { ticker: 'AAPL' }, + priority: 50, + max_attempts: 3, + resource_key: resourceKey + }) + ) + ); + + const createdResults = results.filter((r) => r.created); + const conflictResults = results.filter((r) => !r.created); + + expect(createdResults.length).toBe(1); + expect(conflictResults.length).toBe(concurrentCount - 1); + expect(createdResults[0]?.task.resource_key).toBe(resourceKey); + expect(createdResults[0]?.task.status).toBe('queued'); + }); + + it('allows creating new task for same resource after previous task completes', async () => { + if (!tasksRepo) { + throw new Error('tasks repo not initialized'); + } + + const resourceKey = 'sync_filings:MSFT'; + + const first = await tasksRepo.createTaskRunRecordAtomic({ + id: 'task-first-msft', + user_id: TEST_USER_ID, + task_type: 'sync_filings', + payload: { ticker: 'MSFT' }, + priority: 50, + max_attempts: 3, + resource_key: resourceKey + }); + + expect(first.created).toBe(true); + if (!first.created) throw new Error('Expected task to be created'); + + await tasksRepo.completeTask(first.task.id, { ticker: 'MSFT' }); + + const second = await tasksRepo.createTaskRunRecordAtomic({ + id: 'task-second-msft', + user_id: TEST_USER_ID, + task_type: 'sync_filings', + payload: { ticker: 'MSFT' }, + priority: 50, + max_attempts: 3, + resource_key: resourceKey + }); + + expect(second.created).toBe(true); + if (!second.created) throw new Error('Expected second task to be created'); + expect(second.task.id).not.toBe(first.task.id); + }); + + it('allows creating tasks without resource key without deduplication', async () => { + if (!tasksRepo) { + throw new Error('tasks repo not initialized'); + } + + const results = await Promise.all([ + tasksRepo.createTaskRunRecordAtomic({ + id: 'task-nokey-1', + user_id: TEST_USER_ID, + task_type: 'sync_filings', + payload: { ticker: 'GOOGL' }, + priority: 50, + max_attempts: 3, + resource_key: null + }), + tasksRepo.createTaskRunRecordAtomic({ + id: 'task-nokey-2', + user_id: TEST_USER_ID, + task_type: 'sync_filings', + payload: { ticker: 'GOOGL' }, + priority: 50, + max_attempts: 3, + resource_key: null + }) + ]); + + expect(results[0]?.created).toBe(true); + expect(results[1]?.created).toBe(true); + if (!results[0]?.created || !results[1]?.created) { + throw new Error('Expected both tasks to be created'); + } + expect(results[0].task.id).not.toBe(results[1].task.id); + }); }); diff --git a/lib/server/repos/tasks.ts b/lib/server/repos/tasks.ts index 86fd1c7..438c220 100644 --- a/lib/server/repos/tasks.ts +++ b/lib/server/repos/tasks.ts @@ -153,6 +153,83 @@ export async function createTaskRunRecord(input: CreateTaskInput) { }); } +export type AtomicCreateResult = + | { task: Task; created: true } + | { task: null; created: false }; + +const SQLITE_CONSTRAINT_UNIQUE = 2067; + +async function attemptAtomicInsert( + tx: Parameters[0]>[0], + input: CreateTaskInput, + now: string +): Promise<{ task: Task; created: true } | null> { + try { + const [row] = await tx + .insert(taskRun) + .values({ + id: input.id, + user_id: input.user_id, + task_type: input.task_type, + status: 'queued', + stage: 'queued', + stage_detail: null, + stage_context: null, + resource_key: input.resource_key ?? null, + notification_read_at: null, + notification_silenced_at: null, + priority: input.priority, + payload: input.payload, + result: null, + error: null, + attempts: 0, + max_attempts: input.max_attempts, + workflow_run_id: null, + created_at: now, + updated_at: now, + finished_at: null + }) + .returning(); + + if (!row) { + return null; + } + + await insertTaskStageEvent(tx, { + task_id: row.id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + stage_context: row.stage_context ?? null, + status: row.status, + created_at: now + }); + + return { task: toTask(row), created: true }; + } catch (error) { + const sqliteError = error as { code?: number; message?: string }; + if ( + sqliteError.code === SQLITE_CONSTRAINT_UNIQUE || + sqliteError.message?.includes('UNIQUE constraint failed') + ) { + return null; + } + throw error; + } +} + +export async function createTaskRunRecordAtomic(input: CreateTaskInput): Promise { + const now = new Date().toISOString(); + + return await db.transaction(async (tx) => { + const result = await attemptAtomicInsert(tx, input, now); + if (result) { + return result; + } + return { task: null, created: false }; + }); +} + export async function setTaskWorkflowRunId(taskId: string, workflowRunId: string) { await db .update(taskRun) diff --git a/lib/server/tasks.ts b/lib/server/tasks.ts index eaa8d4a..80a3b81 100644 --- a/lib/server/tasks.ts +++ b/lib/server/tasks.ts @@ -7,6 +7,7 @@ import { describeTaskFailure } from '@/lib/server/task-errors'; import { countTasksByStatus, createTaskRunRecord, + createTaskRunRecordAtomic, findInFlightTaskByResourceKey, getTaskByIdForUser, listTaskStageEventsForTask, @@ -134,6 +135,38 @@ export async function findOrEnqueueTask(input: EnqueueTaskInput) { return await enqueueTask(input); } + const taskId = randomUUID(); + const result = await createTaskRunRecordAtomic({ + id: taskId, + user_id: input.userId, + task_type: input.taskType, + payload: input.payload ?? {}, + priority: input.priority ?? 50, + max_attempts: input.maxAttempts ?? 3, + resource_key: input.resourceKey + }); + + if (result.created) { + try { + const run = await start(runTaskWorkflow, [result.task.id]); + await setTaskWorkflowRunId(result.task.id, run.runId); + + return { + ...result.task, + workflow_run_id: run.runId + } satisfies Task; + } catch (error) { + const failure = describeTaskFailure(result.task, 'Failed to start workflow'); + await markTaskFailure(result.task.id, failure.detail, 'failed', { + detail: failure.summary + }); + + const wrapped = new Error(failure.detail); + (wrapped as Error & { cause?: unknown }).cause = error; + throw wrapped; + } + } + const existingTask = await findInFlightTaskByResourceKey( input.userId, input.taskType, @@ -141,13 +174,10 @@ export async function findOrEnqueueTask(input: EnqueueTaskInput) { ); if (existingTask) { - const reconciledTask = await reconcileTaskWithWorkflow(existingTask); - if (reconciledTask.status === 'queued' || reconciledTask.status === 'running') { - return reconciledTask; - } + return await reconcileTaskWithWorkflow(existingTask); } - return await enqueueTask(input); + throw new Error('Task deduplication conflict detected but no in-flight task found'); } export async function findInFlightTask(userId: string, taskType: TaskType, resourceKey: string) {