Collapse filing sync notifications into one batch surface
This commit is contained in:
@@ -73,6 +73,7 @@ import {
|
||||
import { answerSearchQuery, searchKnowledgeBase } from '@/lib/server/search';
|
||||
import {
|
||||
enqueueTask,
|
||||
findOrEnqueueTask,
|
||||
findInFlightTask,
|
||||
getTaskById,
|
||||
getTaskTimeline,
|
||||
@@ -340,7 +341,7 @@ async function queueAutoFilingSync(
|
||||
metadata?: { category?: unknown; tags?: unknown }
|
||||
) {
|
||||
try {
|
||||
await enqueueTask({
|
||||
await findOrEnqueueTask({
|
||||
userId,
|
||||
taskType: 'sync_filings',
|
||||
payload: buildSyncFilingsPayload({
|
||||
@@ -1459,7 +1460,7 @@ export const app = new Elysia({ prefix: '/api' })
|
||||
if (shouldQueueSync) {
|
||||
try {
|
||||
const watchlistItem = await getWatchlistItemByTicker(session.user.id, ticker);
|
||||
await enqueueTask({
|
||||
await findOrEnqueueTask({
|
||||
userId: session.user.id,
|
||||
taskType: 'sync_filings',
|
||||
payload: buildSyncFilingsPayload({
|
||||
@@ -1661,7 +1662,7 @@ export const app = new Elysia({ prefix: '/api' })
|
||||
|
||||
try {
|
||||
const limit = typeof payload.limit === 'number' ? payload.limit : Number(payload.limit);
|
||||
const task = await enqueueTask({
|
||||
const task = await findOrEnqueueTask({
|
||||
userId: session.user.id,
|
||||
taskType: 'sync_filings',
|
||||
payload: buildSyncFilingsPayload({
|
||||
|
||||
@@ -467,6 +467,60 @@ if (process.env.RUN_TASK_WORKFLOW_E2E === '1') {
|
||||
expect(task.payload.tags).toEqual(['semis', 'ai']);
|
||||
});
|
||||
|
||||
it('reuses the same in-flight filing sync task for repeated same-ticker requests', async () => {
|
||||
const first = await jsonRequest('POST', '/api/filings/sync', {
|
||||
ticker: 'NVDA',
|
||||
limit: 20
|
||||
});
|
||||
const second = await jsonRequest('POST', '/api/filings/sync', {
|
||||
ticker: 'nvda',
|
||||
limit: 20
|
||||
});
|
||||
|
||||
expect(first.response.status).toBe(200);
|
||||
expect(second.response.status).toBe(200);
|
||||
|
||||
const firstTask = (first.json as { task: { id: string } }).task;
|
||||
const secondTask = (second.json as { task: { id: string } }).task;
|
||||
|
||||
expect(secondTask.id).toBe(firstTask.id);
|
||||
|
||||
const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=10&status=queued&status=running');
|
||||
expect(tasksResponse.response.status).toBe(200);
|
||||
|
||||
const tasks = (tasksResponse.json as {
|
||||
tasks: Array<{ id: string; task_type: string; payload: { ticker?: string } }>;
|
||||
}).tasks.filter((task) => task.task_type === 'sync_filings' && task.payload.ticker === 'NVDA');
|
||||
|
||||
expect(tasks).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('lets different tickers queue independent filing sync tasks', async () => {
|
||||
const nvda = await jsonRequest('POST', '/api/filings/sync', { ticker: 'NVDA', limit: 20 });
|
||||
const msft = await jsonRequest('POST', '/api/filings/sync', { ticker: 'MSFT', limit: 20 });
|
||||
const aapl = await jsonRequest('POST', '/api/filings/sync', { ticker: 'AAPL', limit: 20 });
|
||||
|
||||
const ids = [
|
||||
(nvda.json as { task: { id: string } }).task.id,
|
||||
(msft.json as { task: { id: string } }).task.id,
|
||||
(aapl.json as { task: { id: string } }).task.id
|
||||
];
|
||||
|
||||
expect(new Set(ids).size).toBe(3);
|
||||
|
||||
const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=10&status=queued&status=running');
|
||||
expect(tasksResponse.response.status).toBe(200);
|
||||
|
||||
const syncTickers = (tasksResponse.json as {
|
||||
tasks: Array<{ task_type: string; payload: { ticker?: string } }>;
|
||||
}).tasks
|
||||
.filter((task) => task.task_type === 'sync_filings')
|
||||
.map((task) => task.payload.ticker)
|
||||
.filter((ticker): ticker is string => typeof ticker === 'string');
|
||||
|
||||
expect(syncTickers.sort()).toEqual(['AAPL', 'MSFT', 'NVDA']);
|
||||
});
|
||||
|
||||
it('scopes the filings endpoint by ticker while leaving the global endpoint mixed', async () => {
|
||||
if (!sqliteClient) {
|
||||
throw new Error('sqlite client not initialized');
|
||||
|
||||
@@ -129,6 +129,27 @@ export async function enqueueTask(input: EnqueueTaskInput) {
|
||||
}
|
||||
}
|
||||
|
||||
export async function findOrEnqueueTask(input: EnqueueTaskInput) {
|
||||
if (!input.resourceKey) {
|
||||
return await enqueueTask(input);
|
||||
}
|
||||
|
||||
const existingTask = await findInFlightTaskByResourceKey(
|
||||
input.userId,
|
||||
input.taskType,
|
||||
input.resourceKey
|
||||
);
|
||||
|
||||
if (existingTask) {
|
||||
const reconciledTask = await reconcileTaskWithWorkflow(existingTask);
|
||||
if (reconciledTask.status === 'queued' || reconciledTask.status === 'running') {
|
||||
return reconciledTask;
|
||||
}
|
||||
}
|
||||
|
||||
return await enqueueTask(input);
|
||||
}
|
||||
|
||||
export async function findInFlightTask(userId: string, taskType: TaskType, resourceKey: string) {
|
||||
const task = await findInFlightTaskByResourceKey(userId, taskType, resourceKey);
|
||||
|
||||
|
||||
251
lib/task-notification-entries.test.ts
Normal file
251
lib/task-notification-entries.test.ts
Normal file
@@ -0,0 +1,251 @@
|
||||
import { describe, expect, it } from 'bun:test';
|
||||
import { buildTaskNotification } from '@/lib/server/task-notifications';
|
||||
import {
|
||||
buildNotificationEntries,
|
||||
notificationEntrySignature,
|
||||
type FilingSyncBatchState
|
||||
} from '@/lib/task-notification-entries';
|
||||
import type { Task } from '@/lib/types';
|
||||
|
||||
function makeTask(overrides: Partial<Omit<Task, 'notification'>> = {}): Task {
|
||||
const task = {
|
||||
id: 'task-1',
|
||||
user_id: 'user-1',
|
||||
task_type: 'sync_filings',
|
||||
status: 'running',
|
||||
stage: 'sync.extract_taxonomy',
|
||||
stage_detail: 'Extracting taxonomy for NVDA',
|
||||
stage_context: {
|
||||
progress: {
|
||||
current: 2,
|
||||
total: 5,
|
||||
unit: 'filings'
|
||||
},
|
||||
counters: {
|
||||
fetched: 5,
|
||||
inserted: 2,
|
||||
updated: 1,
|
||||
hydrated: 1,
|
||||
failed: 0
|
||||
},
|
||||
subject: {
|
||||
ticker: 'NVDA',
|
||||
accessionNumber: '0000000000-26-000001'
|
||||
}
|
||||
},
|
||||
resource_key: 'sync_filings:NVDA',
|
||||
notification_read_at: null,
|
||||
notification_silenced_at: null,
|
||||
priority: 50,
|
||||
payload: {
|
||||
ticker: 'NVDA',
|
||||
limit: 20
|
||||
},
|
||||
result: null,
|
||||
error: null,
|
||||
attempts: 1,
|
||||
max_attempts: 3,
|
||||
workflow_run_id: 'run-1',
|
||||
created_at: '2026-03-14T10:00:00.000Z',
|
||||
updated_at: '2026-03-14T10:05:00.000Z',
|
||||
finished_at: null,
|
||||
...overrides
|
||||
} satisfies Omit<Task, 'notification'>;
|
||||
|
||||
return {
|
||||
...task,
|
||||
notification: buildTaskNotification(task)
|
||||
};
|
||||
}
|
||||
|
||||
function batchState(overrides: Partial<FilingSyncBatchState> = {}): FilingSyncBatchState {
|
||||
return {
|
||||
active: true,
|
||||
taskIds: ['task-1', 'task-2'],
|
||||
latestTaskId: 'task-2',
|
||||
startedAt: '2026-03-14T10:00:00.000Z',
|
||||
finishedAt: null,
|
||||
terminalVisible: false,
|
||||
...overrides
|
||||
};
|
||||
}
|
||||
|
||||
describe('task notification entries', () => {
|
||||
it('collapses multiple active filing sync tasks into one aggregate entry', () => {
|
||||
const first = makeTask();
|
||||
const second = makeTask({
|
||||
id: 'task-2',
|
||||
resource_key: 'sync_filings:MSFT',
|
||||
payload: { ticker: 'MSFT', limit: 20 },
|
||||
stage_context: {
|
||||
progress: { current: 3, total: 5, unit: 'filings' },
|
||||
counters: { fetched: 4, inserted: 1, updated: 2, hydrated: 0, failed: 0 },
|
||||
subject: { ticker: 'MSFT', accessionNumber: '0000000000-26-000002' }
|
||||
},
|
||||
updated_at: '2026-03-14T10:06:00.000Z'
|
||||
});
|
||||
|
||||
const entries = buildNotificationEntries({
|
||||
activeTasks: [first, second],
|
||||
finishedTasks: [],
|
||||
filingSyncBatch: batchState()
|
||||
});
|
||||
|
||||
const filingEntries = entries.filter((entry) => entry.kind === 'filing_sync_batch');
|
||||
expect(filingEntries).toHaveLength(1);
|
||||
expect(filingEntries[0]).toMatchObject({
|
||||
id: 'filing-sync:active',
|
||||
status: 'running',
|
||||
statusLine: 'Syncing filings for 2 tickers',
|
||||
detailLine: '2 running • 0 queued',
|
||||
primaryTaskId: 'task-2'
|
||||
});
|
||||
expect(filingEntries[0]?.stats).toEqual([
|
||||
{ label: 'Fetched', value: '9' },
|
||||
{ label: 'Inserted', value: '3' },
|
||||
{ label: 'Updated', value: '3' },
|
||||
{ label: 'Hydrated', value: '1' },
|
||||
{ label: 'Failed', value: '0' }
|
||||
]);
|
||||
});
|
||||
|
||||
it('builds active filing sync detail from mixed queued and running tasks', () => {
|
||||
const running = makeTask();
|
||||
const queued = makeTask({
|
||||
id: 'task-2',
|
||||
status: 'queued',
|
||||
stage: 'queued',
|
||||
stage_detail: 'Queued for filings sync',
|
||||
resource_key: 'sync_filings:AAPL',
|
||||
payload: { ticker: 'AAPL', limit: 20 },
|
||||
stage_context: {
|
||||
progress: { current: 0, total: 5, unit: 'filings' },
|
||||
counters: { fetched: 0, inserted: 0, updated: 0, hydrated: 0, failed: 0 },
|
||||
subject: { ticker: 'AAPL', accessionNumber: '0000000000-26-000003' }
|
||||
},
|
||||
updated_at: '2026-03-14T10:04:00.000Z'
|
||||
});
|
||||
|
||||
const entry = buildNotificationEntries({
|
||||
activeTasks: [running, queued],
|
||||
finishedTasks: [],
|
||||
filingSyncBatch: batchState()
|
||||
}).find((candidate) => candidate.kind === 'filing_sync_batch');
|
||||
|
||||
expect(entry?.detailLine).toBe('1 running • 1 queued');
|
||||
});
|
||||
|
||||
it('keeps one terminal filing sync summary for mixed completion outcomes', () => {
|
||||
const completed = makeTask({
|
||||
status: 'completed',
|
||||
stage: 'completed',
|
||||
stage_detail: 'Completed sync for NVDA',
|
||||
result: {
|
||||
ticker: 'NVDA',
|
||||
fetched: 5,
|
||||
inserted: 2,
|
||||
updated: 1,
|
||||
taxonomySnapshotsHydrated: 1,
|
||||
taxonomySnapshotsFailed: 0
|
||||
},
|
||||
finished_at: '2026-03-14T10:07:00.000Z'
|
||||
});
|
||||
const failed = makeTask({
|
||||
id: 'task-2',
|
||||
status: 'failed',
|
||||
stage: 'sync.persist_filings',
|
||||
stage_detail: 'Persist failed for MSFT',
|
||||
error: 'Persist failed for MSFT',
|
||||
resource_key: 'sync_filings:MSFT',
|
||||
payload: { ticker: 'MSFT', limit: 20 },
|
||||
stage_context: {
|
||||
progress: { current: 5, total: 5, unit: 'filings' },
|
||||
counters: { fetched: 5, inserted: 0, updated: 0, hydrated: 0, failed: 1 },
|
||||
subject: { ticker: 'MSFT', accessionNumber: '0000000000-26-000002' }
|
||||
},
|
||||
finished_at: '2026-03-14T10:08:00.000Z',
|
||||
updated_at: '2026-03-14T10:08:00.000Z'
|
||||
});
|
||||
|
||||
const entry = buildNotificationEntries({
|
||||
activeTasks: [],
|
||||
finishedTasks: [completed, failed],
|
||||
filingSyncBatch: batchState({
|
||||
active: false,
|
||||
terminalVisible: true,
|
||||
finishedAt: '2026-03-14T10:08:00.000Z'
|
||||
})
|
||||
}).find((candidate) => candidate.kind === 'filing_sync_batch');
|
||||
|
||||
expect(entry).toMatchObject({
|
||||
status: 'failed',
|
||||
statusLine: 'Filing sync finished with issues',
|
||||
detailLine: '2 tickers processed • 1 failed'
|
||||
});
|
||||
});
|
||||
|
||||
it('leaves non-sync tasks ungrouped', () => {
|
||||
const sync = makeTask();
|
||||
const refresh = makeTask({
|
||||
id: 'task-3',
|
||||
task_type: 'refresh_prices',
|
||||
resource_key: 'refresh_prices:portfolio',
|
||||
payload: {},
|
||||
stage: 'refresh.fetch_quotes',
|
||||
stage_detail: 'Fetching quotes',
|
||||
stage_context: {
|
||||
progress: { current: 3, total: 4, unit: 'tickers' },
|
||||
counters: { updatedCount: 2, holdings: 4 },
|
||||
subject: { ticker: 'NVDA' }
|
||||
}
|
||||
});
|
||||
|
||||
const entries = buildNotificationEntries({
|
||||
activeTasks: [sync, refresh],
|
||||
finishedTasks: [],
|
||||
filingSyncBatch: batchState({ taskIds: ['task-1'] })
|
||||
});
|
||||
|
||||
expect(entries.filter((entry) => entry.kind === 'single')).toHaveLength(1);
|
||||
expect(entries.some((entry) => entry.id === 'task-3')).toBe(true);
|
||||
expect(entries.some((entry) => entry.kind === 'filing_sync_batch')).toBe(true);
|
||||
});
|
||||
|
||||
it('ignores noisy filing sync detail-only changes in the aggregate signature', () => {
|
||||
const first = makeTask();
|
||||
const second = makeTask({
|
||||
id: 'task-2',
|
||||
resource_key: 'sync_filings:MSFT',
|
||||
payload: { ticker: 'MSFT', limit: 20 },
|
||||
stage_detail: 'Extracting taxonomy for MSFT',
|
||||
stage_context: {
|
||||
progress: { current: 2, total: 5, unit: 'filings' },
|
||||
counters: { fetched: 4, inserted: 1, updated: 2, hydrated: 0, failed: 0 },
|
||||
subject: { ticker: 'MSFT', accessionNumber: '0000000000-26-000002' }
|
||||
},
|
||||
updated_at: '2026-03-14T10:06:00.000Z'
|
||||
});
|
||||
|
||||
const original = buildNotificationEntries({
|
||||
activeTasks: [first, second],
|
||||
finishedTasks: [],
|
||||
filingSyncBatch: batchState()
|
||||
}).find((entry) => entry.kind === 'filing_sync_batch');
|
||||
|
||||
const { notification, ...secondCore } = second;
|
||||
void notification;
|
||||
const noisyUpdate = makeTask({
|
||||
...secondCore,
|
||||
stage_detail: 'Extracting taxonomy for MSFT filing 2/5'
|
||||
});
|
||||
const updated = buildNotificationEntries({
|
||||
activeTasks: [first, noisyUpdate],
|
||||
finishedTasks: [],
|
||||
filingSyncBatch: batchState()
|
||||
}).find((entry) => entry.kind === 'filing_sync_batch');
|
||||
|
||||
expect(original).toBeTruthy();
|
||||
expect(updated).toBeTruthy();
|
||||
expect(notificationEntrySignature(original!)).toBe(notificationEntrySignature(updated!));
|
||||
});
|
||||
});
|
||||
326
lib/task-notification-entries.ts
Normal file
326
lib/task-notification-entries.ts
Normal file
@@ -0,0 +1,326 @@
|
||||
import type {
|
||||
Task,
|
||||
TaskNotificationAction,
|
||||
TaskNotificationEntry,
|
||||
TaskNotificationStat
|
||||
} from '@/lib/types';
|
||||
|
||||
const FILING_SYNC_ENTRY_ID = 'filing-sync:active';
|
||||
const SYNC_STAT_LABELS = ['Fetched', 'Inserted', 'Updated', 'Hydrated', 'Failed'] as const;
|
||||
|
||||
export type FilingSyncBatchState = {
|
||||
active: boolean;
|
||||
taskIds: string[];
|
||||
latestTaskId: string | null;
|
||||
startedAt: string | null;
|
||||
finishedAt: string | null;
|
||||
terminalVisible: boolean;
|
||||
};
|
||||
|
||||
export const EMPTY_FILING_SYNC_BATCH_STATE: FilingSyncBatchState = {
|
||||
active: false,
|
||||
taskIds: [],
|
||||
latestTaskId: null,
|
||||
startedAt: null,
|
||||
finishedAt: null,
|
||||
terminalVisible: false
|
||||
};
|
||||
|
||||
function asRecord(value: unknown) {
|
||||
return value && typeof value === 'object' && !Array.isArray(value)
|
||||
? value as Record<string, unknown>
|
||||
: null;
|
||||
}
|
||||
|
||||
function asString(value: unknown) {
|
||||
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null;
|
||||
}
|
||||
|
||||
function formatInteger(value: number) {
|
||||
return new Intl.NumberFormat('en-US', { maximumFractionDigits: 0 }).format(value);
|
||||
}
|
||||
|
||||
function parseInteger(value: string) {
|
||||
const parsed = Number(value.replace(/,/g, ''));
|
||||
return Number.isFinite(parsed) ? parsed : null;
|
||||
}
|
||||
|
||||
function isTerminalTask(task: Task) {
|
||||
return task.status === 'completed' || task.status === 'failed';
|
||||
}
|
||||
|
||||
function isSyncTask(task: Task) {
|
||||
return task.task_type === 'sync_filings';
|
||||
}
|
||||
|
||||
function latestTask(tasks: Task[]) {
|
||||
return [...tasks].sort((left, right) => (
|
||||
new Date(right.updated_at).getTime() - new Date(left.updated_at).getTime()
|
||||
))[0] ?? null;
|
||||
}
|
||||
|
||||
function taskTicker(task: Task) {
|
||||
const payload = asRecord(task.payload);
|
||||
const result = asRecord(task.result);
|
||||
|
||||
if (typeof task.stage_context?.subject?.ticker === 'string' && task.stage_context.subject.ticker.trim().length > 0) {
|
||||
return task.stage_context.subject.ticker.trim().toUpperCase();
|
||||
}
|
||||
|
||||
return (
|
||||
asString(result?.ticker)
|
||||
?? asString(payload?.ticker)
|
||||
?? (task.resource_key?.startsWith('sync_filings:') ? task.resource_key.slice('sync_filings:'.length) : null)
|
||||
)?.toUpperCase() ?? null;
|
||||
}
|
||||
|
||||
function taskNotificationEntry(task: Task): TaskNotificationEntry {
|
||||
return {
|
||||
id: task.id,
|
||||
kind: 'single',
|
||||
status: task.status,
|
||||
title: task.notification.title,
|
||||
statusLine: task.notification.statusLine,
|
||||
detailLine: task.notification.detailLine,
|
||||
progress: task.notification.progress,
|
||||
stats: task.notification.stats,
|
||||
updatedAt: task.updated_at,
|
||||
primaryTaskId: task.id,
|
||||
taskIds: [task.id],
|
||||
actions: task.notification.actions,
|
||||
notificationReadAt: task.notification_read_at,
|
||||
notificationSilencedAt: task.notification_silenced_at
|
||||
};
|
||||
}
|
||||
|
||||
function sumStats(tasks: Task[]) {
|
||||
return SYNC_STAT_LABELS.flatMap((label) => {
|
||||
let seen = false;
|
||||
let total = 0;
|
||||
|
||||
for (const task of tasks) {
|
||||
const stat = task.notification.stats.find((entry) => entry.label === label);
|
||||
if (!stat) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const parsed = parseInteger(stat.value);
|
||||
if (parsed === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
seen = true;
|
||||
total += parsed;
|
||||
}
|
||||
|
||||
return seen ? [{ label, value: formatInteger(total) } satisfies TaskNotificationStat] : [];
|
||||
});
|
||||
}
|
||||
|
||||
function aggregateProgress(tasks: Task[]) {
|
||||
const progressEntries = tasks
|
||||
.map((task) => task.notification.progress)
|
||||
.filter((progress): progress is NonNullable<Task['notification']['progress']> => Boolean(progress));
|
||||
|
||||
if (progressEntries.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const unit = progressEntries[0]?.unit ?? null;
|
||||
if (!unit || progressEntries.some((progress) => progress.unit !== unit)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const current = progressEntries.reduce((sum, progress) => sum + progress.current, 0);
|
||||
const total = progressEntries.reduce((sum, progress) => sum + progress.total, 0);
|
||||
const percent = total > 0 ? Math.min(100, Math.max(0, Math.round((current / total) * 100))) : null;
|
||||
|
||||
return { current, total, unit, percent };
|
||||
}
|
||||
|
||||
function buildFilingActions(tasks: Task[]): TaskNotificationAction[] {
|
||||
const tickers = [...new Set(tasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))];
|
||||
const href = tickers.length === 1
|
||||
? `/filings?ticker=${encodeURIComponent(tickers[0] ?? '')}`
|
||||
: '/filings';
|
||||
|
||||
return [
|
||||
{
|
||||
id: 'open_filings',
|
||||
label: 'Open filings',
|
||||
href,
|
||||
primary: true
|
||||
},
|
||||
{
|
||||
id: 'open_details',
|
||||
label: 'Open details',
|
||||
href: null
|
||||
}
|
||||
];
|
||||
}
|
||||
|
||||
function aggregateNotificationState(tasks: Task[]) {
|
||||
if (tasks.length === 0) {
|
||||
return {
|
||||
notificationReadAt: null,
|
||||
notificationSilencedAt: null
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
notificationReadAt: tasks.every((task) => task.notification_read_at !== null)
|
||||
? latestTask(tasks)?.notification_read_at ?? null
|
||||
: null,
|
||||
notificationSilencedAt: tasks.every((task) => task.notification_silenced_at !== null)
|
||||
? latestTask(tasks)?.notification_silenced_at ?? null
|
||||
: null
|
||||
};
|
||||
}
|
||||
|
||||
function buildActiveFilingSyncEntry(activeTasks: Task[], memberTasks: Task[], batch: FilingSyncBatchState) {
|
||||
const sourceTasks = memberTasks.length > 0 ? memberTasks : activeTasks;
|
||||
const latest = latestTask(activeTasks) ?? latestTask(sourceTasks);
|
||||
if (!latest) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const tickers = [...new Set(sourceTasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))];
|
||||
const runningCount = activeTasks.filter((task) => task.status === 'running').length;
|
||||
const queuedCount = activeTasks.filter((task) => task.status === 'queued').length;
|
||||
const notificationState = aggregateNotificationState(sourceTasks);
|
||||
|
||||
return {
|
||||
id: FILING_SYNC_ENTRY_ID,
|
||||
kind: 'filing_sync_batch',
|
||||
status: runningCount > 0 ? 'running' : 'queued',
|
||||
title: 'Filing sync',
|
||||
statusLine: `Syncing filings for ${tickers.length || activeTasks.length} ${tickers.length === 1 || activeTasks.length === 1 ? 'ticker' : 'tickers'}`,
|
||||
detailLine: tickers.length === 1 && latest.notification.detailLine
|
||||
? latest.notification.detailLine
|
||||
: `${runningCount} running • ${queuedCount} queued`,
|
||||
progress: aggregateProgress(sourceTasks),
|
||||
stats: sumStats(sourceTasks),
|
||||
updatedAt: latest.updated_at,
|
||||
primaryTaskId: batch.latestTaskId ?? latest.id,
|
||||
taskIds: batch.taskIds.length > 0 ? batch.taskIds : sourceTasks.map((task) => task.id),
|
||||
actions: buildFilingActions(sourceTasks),
|
||||
notificationReadAt: notificationState.notificationReadAt,
|
||||
notificationSilencedAt: notificationState.notificationSilencedAt,
|
||||
meta: {
|
||||
tickerCount: tickers.length || activeTasks.length,
|
||||
runningCount,
|
||||
queuedCount,
|
||||
failureCount: 0
|
||||
}
|
||||
} satisfies TaskNotificationEntry;
|
||||
}
|
||||
|
||||
function buildTerminalFilingSyncEntry(tasks: Task[], batch: FilingSyncBatchState) {
|
||||
if (!batch.terminalVisible || tasks.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const latest = latestTask(tasks);
|
||||
if (!latest) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const tickers = [...new Set(tasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))];
|
||||
const failureCount = tasks.filter((task) => task.status === 'failed').length;
|
||||
const notificationState = aggregateNotificationState(tasks);
|
||||
|
||||
return {
|
||||
id: FILING_SYNC_ENTRY_ID,
|
||||
kind: 'filing_sync_batch',
|
||||
status: failureCount > 0 ? 'failed' : 'completed',
|
||||
title: 'Filing sync',
|
||||
statusLine: failureCount > 0 ? 'Filing sync finished with issues' : 'Finished syncing filings',
|
||||
detailLine: `${tickers.length || tasks.length} ${tickers.length === 1 || tasks.length === 1 ? 'ticker' : 'tickers'} processed${failureCount > 0 ? ` • ${failureCount} failed` : ''}`,
|
||||
progress: aggregateProgress(tasks),
|
||||
stats: sumStats(tasks),
|
||||
updatedAt: batch.finishedAt ?? latest.updated_at,
|
||||
primaryTaskId: batch.latestTaskId ?? latest.id,
|
||||
taskIds: batch.taskIds.length > 0 ? batch.taskIds : tasks.map((task) => task.id),
|
||||
actions: buildFilingActions(tasks),
|
||||
notificationReadAt: notificationState.notificationReadAt,
|
||||
notificationSilencedAt: notificationState.notificationSilencedAt,
|
||||
meta: {
|
||||
tickerCount: tickers.length || tasks.length,
|
||||
runningCount: 0,
|
||||
queuedCount: 0,
|
||||
failureCount
|
||||
}
|
||||
} satisfies TaskNotificationEntry;
|
||||
}
|
||||
|
||||
export function buildNotificationEntries(input: {
|
||||
activeTasks: Task[];
|
||||
finishedTasks: Task[];
|
||||
filingSyncBatch: FilingSyncBatchState;
|
||||
}) {
|
||||
const entries: TaskNotificationEntry[] = [];
|
||||
const batchTaskIds = new Set(input.filingSyncBatch.taskIds);
|
||||
|
||||
for (const task of input.activeTasks) {
|
||||
if (!isSyncTask(task)) {
|
||||
entries.push(taskNotificationEntry(task));
|
||||
}
|
||||
}
|
||||
|
||||
for (const task of input.finishedTasks) {
|
||||
if (!isSyncTask(task)) {
|
||||
entries.push(taskNotificationEntry(task));
|
||||
}
|
||||
}
|
||||
|
||||
const activeSyncTasks = input.activeTasks.filter(isSyncTask);
|
||||
const knownBatchSyncTasks = batchTaskIds.size > 0
|
||||
? [...input.activeTasks, ...input.finishedTasks].filter((task) => isSyncTask(task) && batchTaskIds.has(task.id))
|
||||
: activeSyncTasks;
|
||||
|
||||
const filingEntry = activeSyncTasks.length > 0
|
||||
? buildActiveFilingSyncEntry(activeSyncTasks, knownBatchSyncTasks, input.filingSyncBatch)
|
||||
: buildTerminalFilingSyncEntry(knownBatchSyncTasks.filter(isTerminalTask), input.filingSyncBatch);
|
||||
|
||||
if (filingEntry) {
|
||||
entries.push(filingEntry);
|
||||
}
|
||||
|
||||
return entries.sort((left, right) => (
|
||||
new Date(right.updatedAt).getTime() - new Date(left.updatedAt).getTime()
|
||||
));
|
||||
}
|
||||
|
||||
export function isFilingSyncEntry(entry: TaskNotificationEntry) {
|
||||
return entry.kind === 'filing_sync_batch';
|
||||
}
|
||||
|
||||
export function notificationEntrySignature(entry: TaskNotificationEntry) {
|
||||
if (!isFilingSyncEntry(entry)) {
|
||||
return JSON.stringify({
|
||||
kind: entry.kind,
|
||||
status: entry.status,
|
||||
statusLine: entry.statusLine,
|
||||
detailLine: entry.detailLine,
|
||||
progress: entry.progress,
|
||||
stats: entry.stats,
|
||||
primaryTaskId: entry.primaryTaskId
|
||||
});
|
||||
}
|
||||
|
||||
const progressBucket = entry.progress?.percent === null || entry.progress?.percent === undefined
|
||||
? null
|
||||
: Math.floor(entry.progress.percent / 10) * 10;
|
||||
const primaryAction = entry.actions.find((action) => action.primary && action.href) ?? null;
|
||||
|
||||
return JSON.stringify({
|
||||
kind: entry.kind,
|
||||
status: entry.status,
|
||||
progressBucket,
|
||||
runningCount: entry.meta?.runningCount ?? 0,
|
||||
queuedCount: entry.meta?.queuedCount ?? 0,
|
||||
failureCount: entry.meta?.failureCount ?? 0,
|
||||
primaryTaskId: entry.primaryTaskId,
|
||||
primaryHref: primaryAction?.href ?? null
|
||||
});
|
||||
}
|
||||
23
lib/types.ts
23
lib/types.ts
@@ -199,6 +199,29 @@ export type TaskNotificationView = {
|
||||
actions: TaskNotificationAction[];
|
||||
};
|
||||
|
||||
export type TaskNotificationEntry = {
|
||||
id: string;
|
||||
kind: 'single' | 'filing_sync_batch';
|
||||
status: TaskStatus;
|
||||
title: string;
|
||||
statusLine: string;
|
||||
detailLine: string | null;
|
||||
progress: TaskNotificationView['progress'];
|
||||
stats: TaskNotificationStat[];
|
||||
updatedAt: string;
|
||||
primaryTaskId: string;
|
||||
taskIds: string[];
|
||||
actions: TaskNotificationAction[];
|
||||
notificationReadAt: string | null;
|
||||
notificationSilencedAt: string | null;
|
||||
meta?: {
|
||||
tickerCount: number;
|
||||
runningCount: number;
|
||||
queuedCount: number;
|
||||
failureCount: number;
|
||||
};
|
||||
};
|
||||
|
||||
export type Task = {
|
||||
id: string;
|
||||
user_id: string;
|
||||
|
||||
Reference in New Issue
Block a user