From 0d6c6842275fab8fc185440d470607e94cf23561 Mon Sep 17 00:00:00 2001 From: francy51 Date: Sat, 14 Mar 2026 19:32:09 -0400 Subject: [PATCH] Collapse filing sync notifications into one batch surface --- .../task-notifications-trigger.tsx | 235 ++++---- components/shell/app-shell.tsx | 10 +- hooks/use-task-notifications-center.ts | 501 ++++++++++++------ lib/server/api/app.ts | 7 +- .../api/task-workflow-hybrid.e2e.test.ts | 54 ++ lib/server/tasks.ts | 21 + lib/task-notification-entries.test.ts | 251 +++++++++ lib/task-notification-entries.ts | 326 ++++++++++++ lib/types.ts | 23 + 9 files changed, 1148 insertions(+), 280 deletions(-) create mode 100644 lib/task-notification-entries.test.ts create mode 100644 lib/task-notification-entries.ts diff --git a/components/notifications/task-notifications-trigger.tsx b/components/notifications/task-notifications-trigger.tsx index 0729d9d..20e6630 100644 --- a/components/notifications/task-notifications-trigger.tsx +++ b/components/notifications/task-notifications-trigger.tsx @@ -2,7 +2,7 @@ import { formatDistanceToNow } from 'date-fns'; import { Bell, BellRing, LoaderCircle } from 'lucide-react'; -import type { Task } from '@/lib/types'; +import type { TaskNotificationEntry } from '@/lib/types'; import { StatusPill } from '@/components/ui/status-pill'; import { cn } from '@/lib/utils'; @@ -11,20 +11,20 @@ type TaskNotificationsTriggerProps = { isPopoverOpen: boolean; setIsPopoverOpen: (value: boolean) => void; isLoading: boolean; - activeTasks: Task[]; - visibleFinishedTasks: Task[]; - awaitingReviewTasks: Task[]; + activeEntries: TaskNotificationEntry[]; + visibleFinishedEntries: TaskNotificationEntry[]; + awaitingReviewEntries: TaskNotificationEntry[]; showReadFinished: boolean; setShowReadFinished: (value: boolean) => void; openTaskDetails: (taskId: string) => void; - openTaskAction: (task: Task, actionId?: string | null) => void; - silenceTask: (taskId: string, silenced?: boolean) => Promise; - markTaskRead: (taskId: string, read?: boolean) => Promise; + openTaskAction: (entry: TaskNotificationEntry, actionId?: string | null) => void; + silenceEntry: (entry: TaskNotificationEntry, silenced?: boolean) => Promise; + markEntryRead: (entry: TaskNotificationEntry, read?: boolean) => Promise; className?: string; }; -function ProgressBar({ task }: { task: Task }) { - const progress = task.notification.progress; +function ProgressBar({ entry }: { entry: TaskNotificationEntry }) { + const progress = entry.progress; if (!progress) { return null; } @@ -45,14 +45,14 @@ function ProgressBar({ task }: { task: Task }) { ); } -function StatChips({ task }: { task: Task }) { - if (task.notification.stats.length === 0) { +function StatChips({ entry }: { entry: TaskNotificationEntry }) { + if (entry.stats.length === 0) { return null; } return (
- {task.notification.stats.map((stat) => ( + {entry.stats.map((stat) => ( void; + openTaskAction: (entry: TaskNotificationEntry, actionId?: string | null) => void; + silenceEntry: (entry: TaskNotificationEntry, silenced?: boolean) => Promise; + markEntryRead: (entry: TaskNotificationEntry, read?: boolean) => Promise; +}) { + const isRead = entry.notificationReadAt !== null; + + return ( +
+
+

{entry.title}

+ +
+

{entry.statusLine}

+ {entry.detailLine ? ( +

{entry.detailLine}

+ ) : null} + + +

+ {formatDistanceToNow(new Date(entry.updatedAt), { addSuffix: true })} +

+
+ {entry.actions + .filter((action) => action.primary && action.id !== 'open_details') + .slice(0, 1) + .map((action) => ( + + ))} + + {entry.status === 'queued' || entry.status === 'running' ? ( + + ) : ( + + )} +
+
+ ); +} + export function TaskNotificationsTrigger({ unreadCount, isPopoverOpen, setIsPopoverOpen, isLoading, - activeTasks, - visibleFinishedTasks, - awaitingReviewTasks, + activeEntries, + visibleFinishedEntries, + awaitingReviewEntries, showReadFinished, setShowReadFinished, openTaskDetails, openTaskAction, - silenceTask, - markTaskRead, + silenceEntry, + markEntryRead, className }: TaskNotificationsTriggerProps) { const button = ( @@ -128,7 +205,7 @@ export function TaskNotificationsTrigger({ className="h-4 w-4 accent-[color:var(--accent)]" /> -
Unread finished: {awaitingReviewTasks.length}
+
Unread finished: {awaitingReviewEntries.length}
{isLoading ? (
@@ -148,117 +225,37 @@ export function TaskNotificationsTrigger({

Active jobs

- {activeTasks.length === 0 ? ( + {activeEntries.length === 0 ? (

No active jobs.

) : ( - activeTasks.map((task) => ( -
-
-

{task.notification.title}

- -
-

{task.notification.statusLine}

- {task.notification.detailLine ? ( -

{task.notification.detailLine}

- ) : null} - - -

- {formatDistanceToNow(new Date(task.updated_at), { addSuffix: true })} -

-
- {task.notification.actions - .filter((action) => action.primary && action.id !== 'open_details') - .slice(0, 1) - .map((action) => ( - - ))} - - -
-
+ activeEntries.map((entry) => ( + )) )}

Awaiting review

- {visibleFinishedTasks.length === 0 ? ( + {visibleFinishedEntries.length === 0 ? (

No finished jobs to review.

) : ( - visibleFinishedTasks.map((task) => { - const isRead = task.notification_read_at !== null; - - return ( -
-
-

{task.notification.title}

- -
-

{task.notification.statusLine}

- {task.notification.detailLine ? ( -

{task.notification.detailLine}

- ) : null} - - -

- {formatDistanceToNow(new Date(task.updated_at), { addSuffix: true })} -

-
- {task.notification.actions - .filter((action) => action.primary && action.id !== 'open_details') - .slice(0, 1) - .map((action) => ( - - ))} - - -
-
- ); - }) + visibleFinishedEntries.map((entry) => ( + + )) )}
diff --git a/components/shell/app-shell.tsx b/components/shell/app-shell.tsx index 3eb595d..3bcea89 100644 --- a/components/shell/app-shell.tsx +++ b/components/shell/app-shell.tsx @@ -662,15 +662,15 @@ export function AppShell({ isPopoverOpen={notifications.isPopoverOpen} setIsPopoverOpen={notifications.setIsPopoverOpen} isLoading={notifications.isLoading} - activeTasks={notifications.activeTasks} - visibleFinishedTasks={notifications.visibleFinishedTasks} - awaitingReviewTasks={notifications.awaitingReviewTasks} + activeEntries={notifications.activeEntries} + visibleFinishedEntries={notifications.visibleFinishedEntries} + awaitingReviewEntries={notifications.awaitingReviewEntries} showReadFinished={notifications.showReadFinished} setShowReadFinished={notifications.setShowReadFinished} openTaskDetails={notifications.openTaskDetails} openTaskAction={notifications.openTaskAction} - silenceTask={notifications.silenceTask} - markTaskRead={notifications.markTaskRead} + silenceEntry={notifications.silenceEntry} + markEntryRead={notifications.markEntryRead} />
diff --git a/hooks/use-task-notifications-center.ts b/hooks/use-task-notifications-center.ts index 82d84f0..7d55644 100644 --- a/hooks/use-task-notifications-center.ts +++ b/hooks/use-task-notifications-center.ts @@ -8,7 +8,14 @@ import { listRecentTasks, updateTaskNotificationState } from '@/lib/api'; -import type { Task, TaskStatus } from '@/lib/types'; +import { + buildNotificationEntries, + EMPTY_FILING_SYNC_BATCH_STATE, + isFilingSyncEntry, + notificationEntrySignature, + type FilingSyncBatchState +} from '@/lib/task-notification-entries'; +import type { Task, TaskNotificationEntry, TaskStatus } from '@/lib/types'; const ACTIVE_STATUSES: TaskStatus[] = ['queued', 'running']; const TERMINAL_STATUSES: TaskStatus[] = ['completed', 'failed']; @@ -17,19 +24,101 @@ function isTerminalTask(task: Task) { return TERMINAL_STATUSES.includes(task.status); } -function taskSignature(task: Task) { - return JSON.stringify({ - status: task.status, - stage: task.stage, - stageDetail: task.stage_detail, - stageContext: task.stage_context, - error: task.error, - result: isTerminalTask(task) ? task.result : null - }); +function isTerminalEntry(entry: TaskNotificationEntry) { + return TERMINAL_STATUSES.includes(entry.status); } -function taskProgressLabel(task: Task) { - const progress = task.notification.progress; +function shouldNotifyEntry(entry: TaskNotificationEntry) { + return entry.notificationSilencedAt === null; +} + +function isUnreadEntry(entry: TaskNotificationEntry) { + return entry.notificationReadAt === null; +} + +function sortTasksByUpdated(tasks: Task[]) { + return [...tasks].sort((left, right) => ( + new Date(right.updated_at).getTime() - new Date(left.updated_at).getTime() + )); +} + +function latestTask(tasks: Task[]) { + return sortTasksByUpdated(tasks)[0] ?? null; +} + +function taskIdsMatch(left: string[], right: string[]) { + if (left.length !== right.length) { + return false; + } + + return left.every((value, index) => value === right[index]); +} + +function sameBatchState(left: FilingSyncBatchState, right: FilingSyncBatchState) { + return ( + left.active === right.active + && left.latestTaskId === right.latestTaskId + && left.startedAt === right.startedAt + && left.finishedAt === right.finishedAt + && left.terminalVisible === right.terminalVisible + && taskIdsMatch(left.taskIds, right.taskIds) + ); +} + +function deriveFilingSyncBatch( + current: FilingSyncBatchState, + activeTasks: Task[], + finishedTasks: Task[] +) { + const activeSyncTasks = sortTasksByUpdated( + activeTasks.filter((task) => task.task_type === 'sync_filings') + ); + + if (activeSyncTasks.length > 0) { + const resetBatch = current.terminalVisible || (!current.active && current.taskIds.length === 0); + const taskIds = resetBatch + ? activeSyncTasks.map((task) => task.id) + : [...new Set([...current.taskIds, ...activeSyncTasks.map((task) => task.id)])]; + const newestTask = activeSyncTasks[0] ?? null; + const oldestActiveTask = [...activeSyncTasks].sort((left, right) => ( + new Date(left.created_at).getTime() - new Date(right.created_at).getTime() + ))[0] ?? null; + + return { + active: true, + taskIds, + latestTaskId: newestTask?.id ?? current.latestTaskId, + startedAt: resetBatch ? (oldestActiveTask?.created_at ?? newestTask?.created_at ?? null) : current.startedAt, + finishedAt: null, + terminalVisible: false + } satisfies FilingSyncBatchState; + } + + if (current.taskIds.length > 0 && (current.active || current.terminalVisible)) { + const batchTaskIds = new Set(current.taskIds); + const terminalMembers = finishedTasks.filter((task) => ( + task.task_type === 'sync_filings' && batchTaskIds.has(task.id) + )); + const newestTerminalTask = latestTask(terminalMembers); + + return { + ...current, + active: false, + latestTaskId: newestTerminalTask?.id ?? current.latestTaskId, + finishedAt: newestTerminalTask?.updated_at ?? current.finishedAt ?? current.startedAt, + terminalVisible: true + } satisfies FilingSyncBatchState; + } + + return EMPTY_FILING_SYNC_BATCH_STATE; +} + +function toastIdForEntry(entry: TaskNotificationEntry) { + return isFilingSyncEntry(entry) ? 'toast:filing-sync' : entry.primaryTaskId; +} + +function entryProgressLabel(entry: TaskNotificationEntry) { + const progress = entry.progress; if (!progress) { return null; } @@ -37,44 +126,31 @@ function taskProgressLabel(task: Task) { return `${progress.current}/${progress.total} ${progress.unit}`; } -function taskDescription(task: Task) { - const lines = [ - task.notification.statusLine, - task.notification.detailLine, - taskProgressLabel(task) - ].filter((value): value is string => Boolean(value)); - - return lines.join(' • '); -} - -function taskTitle(task: Task) { - return task.notification.title; -} - -function terminalToastDescription(task: Task) { - const topStat = task.notification.stats[0]; +function entryDescription(entry: TaskNotificationEntry) { return [ - task.notification.statusLine, - topStat ? `${topStat.label}: ${topStat.value}` : null, - task.notification.detailLine + entry.statusLine, + entry.detailLine, + entryProgressLabel(entry) ].filter((value): value is string => Boolean(value)).join(' • '); } -function shouldNotifyTask(task: Task) { - return !task.notification_silenced_at; -} +function terminalToastDescription(entry: TaskNotificationEntry) { + const topStat = entry.stats[0]; -function isUnread(task: Task) { - return task.notification_read_at === null; + return [ + entry.statusLine, + topStat ? `${topStat.label}: ${topStat.value}` : null, + entry.detailLine + ].filter((value): value is string => Boolean(value)).join(' • '); } type UseTaskNotificationsCenterResult = { - activeTasks: Task[]; - finishedTasks: Task[]; + activeEntries: TaskNotificationEntry[]; + finishedEntries: TaskNotificationEntry[]; unreadCount: number; isLoading: boolean; - awaitingReviewTasks: Task[]; - visibleFinishedTasks: Task[]; + awaitingReviewEntries: TaskNotificationEntry[]; + visibleFinishedEntries: TaskNotificationEntry[]; showReadFinished: boolean; setShowReadFinished: (value: boolean) => void; isPopoverOpen: boolean; @@ -84,9 +160,9 @@ type UseTaskNotificationsCenterResult = { isDetailOpen: boolean; setIsDetailOpen: (value: boolean) => void; openTaskDetails: (taskId: string) => void; - openTaskAction: (task: Task, actionId?: string | null) => void; - markTaskRead: (taskId: string, read?: boolean) => Promise; - silenceTask: (taskId: string, silenced?: boolean) => Promise; + openTaskAction: (entry: TaskNotificationEntry, actionId?: string | null) => void; + markEntryRead: (entry: TaskNotificationEntry, read?: boolean) => Promise; + silenceEntry: (entry: TaskNotificationEntry, silenced?: boolean) => Promise; refreshTasks: () => Promise; }; @@ -95,6 +171,7 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { const queryClient = useQueryClient(); const [activeTasks, setActiveTasks] = useState([]); const [finishedTasks, setFinishedTasks] = useState([]); + const [filingSyncBatch, setFilingSyncBatch] = useState(EMPTY_FILING_SYNC_BATCH_STATE); const [showReadFinished, setShowReadFinished] = useState(false); const [isPopoverOpen, setIsPopoverOpen] = useState(false); const [hasLoadedActive, setHasLoadedActive] = useState(false); @@ -108,6 +185,9 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { const invalidatedTerminalRef = useRef(new Set()); const activeSnapshotRef = useRef([]); const finishedSnapshotRef = useRef([]); + const filingSyncBatchRef = useRef(EMPTY_FILING_SYNC_BATCH_STATE); + const silenceEntryRef = useRef<(entry: TaskNotificationEntry, silenced?: boolean) => Promise>(async () => {}); + const markEntryReadRef = useRef<(entry: TaskNotificationEntry, read?: boolean) => Promise>(async () => {}); const [isDocumentVisible, setIsDocumentVisible] = useState(() => { if (typeof document === 'undefined') { return true; @@ -116,9 +196,23 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { return document.visibilityState === 'visible'; }); - const applyTaskLocally = useCallback((task: Task) => { - setActiveTasks((prev) => prev.map((entry) => (entry.id === task.id ? task : entry))); - setFinishedTasks((prev) => prev.map((entry) => (entry.id === task.id ? task : entry))); + const syncBatchState = useCallback((nextBatch: FilingSyncBatchState) => { + filingSyncBatchRef.current = nextBatch; + setFilingSyncBatch((current) => sameBatchState(current, nextBatch) ? current : nextBatch); + }, []); + + const mergeTasksLocally = useCallback((tasks: Task[]) => { + if (tasks.length === 0) { + return; + } + + const taskMap = new Map(tasks.map((task) => [task.id, task])); + const mergeList = (list: Task[]) => list.map((entry) => taskMap.get(entry.id) ?? entry); + + activeSnapshotRef.current = mergeList(activeSnapshotRef.current); + finishedSnapshotRef.current = mergeList(finishedSnapshotRef.current); + setActiveTasks((current) => mergeList(current)); + setFinishedTasks((current) => mergeList(current)); }, []); const invalidateForTerminalTask = useCallback((task: Task) => { @@ -163,15 +257,15 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { setIsPopoverOpen(false); }, []); - const openTaskAction = useCallback((task: Task, actionId?: string | null) => { + const openTaskAction = useCallback((entry: TaskNotificationEntry, actionId?: string | null) => { const action = actionId - ? task.notification.actions.find((entry) => entry.id === actionId) - : task.notification.actions.find((entry) => entry.primary && entry.id !== 'open_details') - ?? task.notification.actions.find((entry) => entry.id !== 'open_details') + ? entry.actions.find((candidate) => candidate.id === actionId) + : entry.actions.find((candidate) => candidate.primary && candidate.id !== 'open_details') + ?? entry.actions.find((candidate) => candidate.id !== 'open_details') ?? null; if (!action || action.id === 'open_details' || !action.href) { - openTaskDetails(task.id); + openTaskDetails(entry.primaryTaskId); return; } @@ -179,121 +273,205 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { router.push(action.href); }, [openTaskDetails, router]); - const silenceTask = useCallback(async (taskId: string, silenced = true) => { - try { - const { task } = await updateTaskNotificationState(taskId, { silenced }); - applyTaskLocally(task); - toast.dismiss(taskId); - } catch { - toast.error('Unable to update notification state'); - } - }, [applyTaskLocally]); + const emitEntryToast = useCallback((entry: TaskNotificationEntry) => { + const toastId = toastIdForEntry(entry); - const markTaskRead = useCallback(async (taskId: string, read = true) => { - try { - const { task } = await updateTaskNotificationState(taskId, { read }); - applyTaskLocally(task); - if (read) { - toast.dismiss(taskId); - } - } catch { - toast.error('Unable to update notification state'); - } - }, [applyTaskLocally]); - - const emitTaskToast = useCallback((task: Task) => { - if (!shouldNotifyTask(task)) { - toast.dismiss(task.id); + if (!shouldNotifyEntry(entry)) { + toast.dismiss(toastId); return; } - if (task.status === 'queued' || task.status === 'running') { - toast(taskTitle(task), { - id: task.id, + if (entry.status === 'queued' || entry.status === 'running') { + toast(entry.title, { + id: toastId, duration: Number.POSITIVE_INFINITY, - description: taskDescription(task), + description: entryDescription(entry), action: { label: 'Open details', - onClick: () => openTaskDetails(task.id) + onClick: () => openTaskDetails(entry.primaryTaskId) }, cancel: { label: 'Silence', onClick: () => { - void silenceTask(task.id, true); + void silenceEntryRef.current(entry, true); } } }); return; } - const toastBuilder = task.status === 'completed' ? toast.success : toast.error; - const primaryAction = task.notification.actions.find((entry) => entry.primary && entry.id !== 'open_details') - ?? task.notification.actions.find((entry) => entry.id !== 'open_details') + const toastBuilder = entry.status === 'completed' ? toast.success : toast.error; + const primaryAction = entry.actions.find((candidate) => candidate.primary && candidate.id !== 'open_details') + ?? entry.actions.find((candidate) => candidate.id !== 'open_details') ?? null; - toastBuilder(taskTitle(task), { - id: task.id, + toastBuilder(entry.title, { + id: toastId, duration: 10_000, - description: terminalToastDescription(task), + description: terminalToastDescription(entry), action: { label: primaryAction?.label ?? 'Open details', onClick: () => { if (primaryAction) { - openTaskAction(task, primaryAction.id); + openTaskAction(entry, primaryAction.id); return; } - openTaskDetails(task.id); + openTaskDetails(entry.primaryTaskId); } }, cancel: { label: 'Mark read', onClick: () => { - void markTaskRead(task.id, true); + void markEntryReadRef.current(entry, true); } } }); - }, [markTaskRead, openTaskAction, openTaskDetails, silenceTask]); - - const processSnapshots = useCallback(() => { - const active = activeSnapshotRef.current; - const finished = finishedSnapshotRef.current; - const all = [...active, ...finished]; + }, [openTaskAction, openTaskDetails]); + const processSnapshots = useCallback((nextBatch = filingSyncBatchRef.current) => { if (!activeLoadedRef.current || !finishedLoadedRef.current) { return; } + const entries = buildNotificationEntries({ + activeTasks: activeSnapshotRef.current, + finishedTasks: finishedSnapshotRef.current, + filingSyncBatch: nextBatch + }); + if (stateSignaturesRef.current.size === 0) { - for (const task of all) { - stateSignaturesRef.current.set(task.id, taskSignature(task)); + for (const entry of entries) { + stateSignaturesRef.current.set(entry.id, notificationEntrySignature(entry)); } return; } - for (const task of all) { - const signature = taskSignature(task); - const previousSignature = stateSignaturesRef.current.get(task.id); + for (const entry of entries) { + const signature = notificationEntrySignature(entry); + const previousSignature = stateSignaturesRef.current.get(entry.id); const wasKnown = previousSignature !== undefined; if (!wasKnown || previousSignature !== signature) { - emitTaskToast(task); + emitEntryToast(entry); - if (isTerminalTask(task)) { - invalidateForTerminalTask(task); + if (!isFilingSyncEntry(entry) && isTerminalEntry(entry)) { + const terminalTask = [ + ...activeSnapshotRef.current, + ...finishedSnapshotRef.current + ].find((task) => task.id === entry.primaryTaskId); + + if (terminalTask) { + invalidateForTerminalTask(terminalTask); + } + } + + if (isFilingSyncEntry(entry) && isTerminalEntry(entry)) { + for (const task of [...activeSnapshotRef.current, ...finishedSnapshotRef.current]) { + if (task.task_type === 'sync_filings' && entry.taskIds.includes(task.id) && isTerminalTask(task)) { + invalidateForTerminalTask(task); + } + } } } - stateSignaturesRef.current.set(task.id, signature); + stateSignaturesRef.current.set(entry.id, signature); } - const currentIds = new Set(all.map((task) => task.id)); + const currentIds = new Set(entries.map((entry) => entry.id)); for (const knownId of [...stateSignaturesRef.current.keys()]) { if (!currentIds.has(knownId)) { - toast.dismiss(knownId); + const toastId = knownId === 'filing-sync:active' ? 'toast:filing-sync' : knownId; + toast.dismiss(toastId); + stateSignaturesRef.current.delete(knownId); } } - }, [emitTaskToast, invalidateForTerminalTask]); + }, [emitEntryToast, invalidateForTerminalTask]); + + const applySnapshotState = useCallback(( + nextActiveTasks: Task[], + nextFinishedTasks: Task[], + loaded: { active?: boolean; finished?: boolean } = {} + ) => { + activeSnapshotRef.current = nextActiveTasks; + finishedSnapshotRef.current = nextFinishedTasks; + + if (loaded.active) { + activeLoadedRef.current = true; + setHasLoadedActive(true); + } + + if (loaded.finished) { + finishedLoadedRef.current = true; + setHasLoadedFinished(true); + } + + setActiveTasks(nextActiveTasks); + setFinishedTasks(nextFinishedTasks); + + const nextBatch = deriveFilingSyncBatch( + filingSyncBatchRef.current, + nextActiveTasks, + nextFinishedTasks + ); + + syncBatchState(nextBatch); + processSnapshots(nextBatch); + }, [processSnapshots, syncBatchState]); + + const updateEntryNotification = useCallback(async ( + entry: TaskNotificationEntry, + input: { read?: boolean; silenced?: boolean } + ) => { + const results = await Promise.allSettled( + entry.taskIds.map((taskId) => updateTaskNotificationState(taskId, input)) + ); + const updatedTasks = results.flatMap((result) => ( + result.status === 'fulfilled' ? [result.value.task] : [] + )); + + if (updatedTasks.length > 0) { + mergeTasksLocally(updatedTasks); + } + + let nextBatch = deriveFilingSyncBatch( + filingSyncBatchRef.current, + activeSnapshotRef.current, + finishedSnapshotRef.current + ); + + if ( + isFilingSyncEntry(entry) + && isTerminalEntry(entry) + && (input.read || input.silenced) + && results.every((result) => result.status === 'fulfilled') + ) { + nextBatch = EMPTY_FILING_SYNC_BATCH_STATE; + } + + syncBatchState(nextBatch); + processSnapshots(nextBatch); + + if (results.some((result) => result.status === 'rejected')) { + toast.error('Unable to update notification state'); + return; + } + + if (input.read || input.silenced) { + toast.dismiss(toastIdForEntry(entry)); + } + }, [mergeTasksLocally, processSnapshots, syncBatchState]); + + const silenceEntry = useCallback(async (entry: TaskNotificationEntry, silenced = true) => { + await updateEntryNotification(entry, { silenced }); + }, [updateEntryNotification]); + + const markEntryRead = useCallback(async (entry: TaskNotificationEntry, read = true) => { + await updateEntryNotification(entry, { read }); + }, [updateEntryNotification]); + + silenceEntryRef.current = silenceEntry; + markEntryReadRef.current = markEntryRead; const refreshTasks = useCallback(async () => { try { @@ -308,19 +486,11 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { }) ]); - activeSnapshotRef.current = activeRes.tasks; - finishedSnapshotRef.current = finishedRes.tasks; - activeLoadedRef.current = true; - finishedLoadedRef.current = true; - setHasLoadedActive(true); - setHasLoadedFinished(true); - setActiveTasks(activeRes.tasks); - setFinishedTasks(finishedRes.tasks); - processSnapshots(); + applySnapshotState(activeRes.tasks, finishedRes.tasks, { active: true, finished: true }); } catch { // ignore transient polling failures } - }, [processSnapshots]); + }, [applySnapshotState]); useEffect(() => { if (typeof document === 'undefined') { @@ -364,7 +534,13 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { return 4_000; } - if (finishedSnapshotRef.current.some((task) => isUnread(task))) { + const terminalEntries = buildNotificationEntries({ + activeTasks: activeSnapshotRef.current, + finishedTasks: finishedSnapshotRef.current, + filingSyncBatch: filingSyncBatchRef.current + }).filter(isTerminalEntry); + + if (terminalEntries.some((entry) => isUnreadEntry(entry))) { return 15_000; } @@ -386,11 +562,7 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { return; } - activeSnapshotRef.current = response.tasks; - activeLoadedRef.current = true; - setHasLoadedActive(true); - setActiveTasks(response.tasks); - processSnapshots(); + applySnapshotState(response.tasks, finishedSnapshotRef.current, { active: true }); } catch { // ignore transient polling failures } @@ -413,13 +585,27 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { return; } - finishedSnapshotRef.current = response.tasks; - finishedLoadedRef.current = true; - setHasLoadedFinished(true); - setFinishedTasks(response.tasks); - processSnapshots(); + applySnapshotState(activeSnapshotRef.current, response.tasks, { finished: true }); + + const signature = response.tasks + .map((task) => notificationEntrySignature({ + id: task.id, + kind: 'single', + status: task.status, + title: task.notification.title, + statusLine: task.notification.statusLine, + detailLine: task.notification.detailLine, + progress: task.notification.progress, + stats: task.notification.stats, + updatedAt: task.updated_at, + primaryTaskId: task.id, + taskIds: [task.id], + actions: task.notification.actions, + notificationReadAt: task.notification_read_at, + notificationSilencedAt: task.notification_silenced_at + })) + .join('||'); - const signature = response.tasks.map((task) => taskSignature(task)).join('||'); if (signature === previousTerminalSignature) { stableTerminalPolls += 1; } else { @@ -445,43 +631,52 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { clearTimeout(terminalTimer); } }; - }, [isDetailOpen, isDocumentVisible, isPopoverOpen, processSnapshots]); + }, [applySnapshotState, isDetailOpen, isDocumentVisible, isPopoverOpen]); - const normalizedActiveTasks = useMemo(() => { - return activeTasks.filter((task) => ACTIVE_STATUSES.includes(task.status)); - }, [activeTasks]); + const entries = useMemo(() => buildNotificationEntries({ + activeTasks, + finishedTasks, + filingSyncBatch + }), [activeTasks, filingSyncBatch, finishedTasks]); - const normalizedFinishedTasks = useMemo(() => { - return finishedTasks.filter((task) => TERMINAL_STATUSES.includes(task.status)); - }, [finishedTasks]); + const activeEntries = useMemo(() => { + return entries.filter((entry) => ACTIVE_STATUSES.includes(entry.status)); + }, [entries]); - const awaitingReviewTasks = useMemo(() => { - return normalizedFinishedTasks.filter((task) => isUnread(task)); - }, [normalizedFinishedTasks]); + const normalizedFinishedEntries = useMemo(() => { + return entries.filter((entry) => TERMINAL_STATUSES.includes(entry.status)); + }, [entries]); - const visibleFinishedTasks = useMemo(() => { + const awaitingReviewEntries = useMemo(() => { + return normalizedFinishedEntries.filter((entry) => isUnreadEntry(entry)); + }, [normalizedFinishedEntries]); + + const visibleFinishedEntries = useMemo(() => { if (showReadFinished) { - return normalizedFinishedTasks; + return normalizedFinishedEntries; } - return awaitingReviewTasks; - }, [awaitingReviewTasks, normalizedFinishedTasks, showReadFinished]); + return awaitingReviewEntries; + }, [awaitingReviewEntries, normalizedFinishedEntries, showReadFinished]); const unreadCount = useMemo(() => { - const unreadTerminal = normalizedFinishedTasks.filter((task) => isUnread(task)).length; - const unreadActive = normalizedActiveTasks.filter((task) => isUnread(task) && !task.notification_silenced_at).length; + const unreadTerminal = normalizedFinishedEntries.filter((entry) => isUnreadEntry(entry)).length; + const unreadActive = activeEntries.filter((entry) => ( + isUnreadEntry(entry) && entry.notificationSilencedAt === null + )).length; + return unreadTerminal + unreadActive; - }, [normalizedActiveTasks, normalizedFinishedTasks]); + }, [activeEntries, normalizedFinishedEntries]); const isLoading = !hasLoadedActive || !hasLoadedFinished; return { - activeTasks: normalizedActiveTasks, - finishedTasks: normalizedFinishedTasks, + activeEntries, + finishedEntries: normalizedFinishedEntries, unreadCount, isLoading, - awaitingReviewTasks, - visibleFinishedTasks, + awaitingReviewEntries, + visibleFinishedEntries, showReadFinished, setShowReadFinished, isPopoverOpen, @@ -492,8 +687,8 @@ export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { setIsDetailOpen, openTaskDetails, openTaskAction, - markTaskRead, - silenceTask, + markEntryRead, + silenceEntry, refreshTasks }; } diff --git a/lib/server/api/app.ts b/lib/server/api/app.ts index d8240c2..c50fe65 100644 --- a/lib/server/api/app.ts +++ b/lib/server/api/app.ts @@ -73,6 +73,7 @@ import { import { answerSearchQuery, searchKnowledgeBase } from '@/lib/server/search'; import { enqueueTask, + findOrEnqueueTask, findInFlightTask, getTaskById, getTaskTimeline, @@ -340,7 +341,7 @@ async function queueAutoFilingSync( metadata?: { category?: unknown; tags?: unknown } ) { try { - await enqueueTask({ + await findOrEnqueueTask({ userId, taskType: 'sync_filings', payload: buildSyncFilingsPayload({ @@ -1459,7 +1460,7 @@ export const app = new Elysia({ prefix: '/api' }) if (shouldQueueSync) { try { const watchlistItem = await getWatchlistItemByTicker(session.user.id, ticker); - await enqueueTask({ + await findOrEnqueueTask({ userId: session.user.id, taskType: 'sync_filings', payload: buildSyncFilingsPayload({ @@ -1661,7 +1662,7 @@ export const app = new Elysia({ prefix: '/api' }) try { const limit = typeof payload.limit === 'number' ? payload.limit : Number(payload.limit); - const task = await enqueueTask({ + const task = await findOrEnqueueTask({ userId: session.user.id, taskType: 'sync_filings', payload: buildSyncFilingsPayload({ diff --git a/lib/server/api/task-workflow-hybrid.e2e.test.ts b/lib/server/api/task-workflow-hybrid.e2e.test.ts index f8eeab2..61c0f04 100644 --- a/lib/server/api/task-workflow-hybrid.e2e.test.ts +++ b/lib/server/api/task-workflow-hybrid.e2e.test.ts @@ -467,6 +467,60 @@ if (process.env.RUN_TASK_WORKFLOW_E2E === '1') { expect(task.payload.tags).toEqual(['semis', 'ai']); }); + it('reuses the same in-flight filing sync task for repeated same-ticker requests', async () => { + const first = await jsonRequest('POST', '/api/filings/sync', { + ticker: 'NVDA', + limit: 20 + }); + const second = await jsonRequest('POST', '/api/filings/sync', { + ticker: 'nvda', + limit: 20 + }); + + expect(first.response.status).toBe(200); + expect(second.response.status).toBe(200); + + const firstTask = (first.json as { task: { id: string } }).task; + const secondTask = (second.json as { task: { id: string } }).task; + + expect(secondTask.id).toBe(firstTask.id); + + const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=10&status=queued&status=running'); + expect(tasksResponse.response.status).toBe(200); + + const tasks = (tasksResponse.json as { + tasks: Array<{ id: string; task_type: string; payload: { ticker?: string } }>; + }).tasks.filter((task) => task.task_type === 'sync_filings' && task.payload.ticker === 'NVDA'); + + expect(tasks).toHaveLength(1); + }); + + it('lets different tickers queue independent filing sync tasks', async () => { + const nvda = await jsonRequest('POST', '/api/filings/sync', { ticker: 'NVDA', limit: 20 }); + const msft = await jsonRequest('POST', '/api/filings/sync', { ticker: 'MSFT', limit: 20 }); + const aapl = await jsonRequest('POST', '/api/filings/sync', { ticker: 'AAPL', limit: 20 }); + + const ids = [ + (nvda.json as { task: { id: string } }).task.id, + (msft.json as { task: { id: string } }).task.id, + (aapl.json as { task: { id: string } }).task.id + ]; + + expect(new Set(ids).size).toBe(3); + + const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=10&status=queued&status=running'); + expect(tasksResponse.response.status).toBe(200); + + const syncTickers = (tasksResponse.json as { + tasks: Array<{ task_type: string; payload: { ticker?: string } }>; + }).tasks + .filter((task) => task.task_type === 'sync_filings') + .map((task) => task.payload.ticker) + .filter((ticker): ticker is string => typeof ticker === 'string'); + + expect(syncTickers.sort()).toEqual(['AAPL', 'MSFT', 'NVDA']); + }); + it('scopes the filings endpoint by ticker while leaving the global endpoint mixed', async () => { if (!sqliteClient) { throw new Error('sqlite client not initialized'); diff --git a/lib/server/tasks.ts b/lib/server/tasks.ts index 6841eb2..eaa8d4a 100644 --- a/lib/server/tasks.ts +++ b/lib/server/tasks.ts @@ -129,6 +129,27 @@ export async function enqueueTask(input: EnqueueTaskInput) { } } +export async function findOrEnqueueTask(input: EnqueueTaskInput) { + if (!input.resourceKey) { + return await enqueueTask(input); + } + + const existingTask = await findInFlightTaskByResourceKey( + input.userId, + input.taskType, + input.resourceKey + ); + + if (existingTask) { + const reconciledTask = await reconcileTaskWithWorkflow(existingTask); + if (reconciledTask.status === 'queued' || reconciledTask.status === 'running') { + return reconciledTask; + } + } + + return await enqueueTask(input); +} + export async function findInFlightTask(userId: string, taskType: TaskType, resourceKey: string) { const task = await findInFlightTaskByResourceKey(userId, taskType, resourceKey); diff --git a/lib/task-notification-entries.test.ts b/lib/task-notification-entries.test.ts new file mode 100644 index 0000000..eef8fa9 --- /dev/null +++ b/lib/task-notification-entries.test.ts @@ -0,0 +1,251 @@ +import { describe, expect, it } from 'bun:test'; +import { buildTaskNotification } from '@/lib/server/task-notifications'; +import { + buildNotificationEntries, + notificationEntrySignature, + type FilingSyncBatchState +} from '@/lib/task-notification-entries'; +import type { Task } from '@/lib/types'; + +function makeTask(overrides: Partial> = {}): Task { + const task = { + id: 'task-1', + user_id: 'user-1', + task_type: 'sync_filings', + status: 'running', + stage: 'sync.extract_taxonomy', + stage_detail: 'Extracting taxonomy for NVDA', + stage_context: { + progress: { + current: 2, + total: 5, + unit: 'filings' + }, + counters: { + fetched: 5, + inserted: 2, + updated: 1, + hydrated: 1, + failed: 0 + }, + subject: { + ticker: 'NVDA', + accessionNumber: '0000000000-26-000001' + } + }, + resource_key: 'sync_filings:NVDA', + notification_read_at: null, + notification_silenced_at: null, + priority: 50, + payload: { + ticker: 'NVDA', + limit: 20 + }, + result: null, + error: null, + attempts: 1, + max_attempts: 3, + workflow_run_id: 'run-1', + created_at: '2026-03-14T10:00:00.000Z', + updated_at: '2026-03-14T10:05:00.000Z', + finished_at: null, + ...overrides + } satisfies Omit; + + return { + ...task, + notification: buildTaskNotification(task) + }; +} + +function batchState(overrides: Partial = {}): FilingSyncBatchState { + return { + active: true, + taskIds: ['task-1', 'task-2'], + latestTaskId: 'task-2', + startedAt: '2026-03-14T10:00:00.000Z', + finishedAt: null, + terminalVisible: false, + ...overrides + }; +} + +describe('task notification entries', () => { + it('collapses multiple active filing sync tasks into one aggregate entry', () => { + const first = makeTask(); + const second = makeTask({ + id: 'task-2', + resource_key: 'sync_filings:MSFT', + payload: { ticker: 'MSFT', limit: 20 }, + stage_context: { + progress: { current: 3, total: 5, unit: 'filings' }, + counters: { fetched: 4, inserted: 1, updated: 2, hydrated: 0, failed: 0 }, + subject: { ticker: 'MSFT', accessionNumber: '0000000000-26-000002' } + }, + updated_at: '2026-03-14T10:06:00.000Z' + }); + + const entries = buildNotificationEntries({ + activeTasks: [first, second], + finishedTasks: [], + filingSyncBatch: batchState() + }); + + const filingEntries = entries.filter((entry) => entry.kind === 'filing_sync_batch'); + expect(filingEntries).toHaveLength(1); + expect(filingEntries[0]).toMatchObject({ + id: 'filing-sync:active', + status: 'running', + statusLine: 'Syncing filings for 2 tickers', + detailLine: '2 running • 0 queued', + primaryTaskId: 'task-2' + }); + expect(filingEntries[0]?.stats).toEqual([ + { label: 'Fetched', value: '9' }, + { label: 'Inserted', value: '3' }, + { label: 'Updated', value: '3' }, + { label: 'Hydrated', value: '1' }, + { label: 'Failed', value: '0' } + ]); + }); + + it('builds active filing sync detail from mixed queued and running tasks', () => { + const running = makeTask(); + const queued = makeTask({ + id: 'task-2', + status: 'queued', + stage: 'queued', + stage_detail: 'Queued for filings sync', + resource_key: 'sync_filings:AAPL', + payload: { ticker: 'AAPL', limit: 20 }, + stage_context: { + progress: { current: 0, total: 5, unit: 'filings' }, + counters: { fetched: 0, inserted: 0, updated: 0, hydrated: 0, failed: 0 }, + subject: { ticker: 'AAPL', accessionNumber: '0000000000-26-000003' } + }, + updated_at: '2026-03-14T10:04:00.000Z' + }); + + const entry = buildNotificationEntries({ + activeTasks: [running, queued], + finishedTasks: [], + filingSyncBatch: batchState() + }).find((candidate) => candidate.kind === 'filing_sync_batch'); + + expect(entry?.detailLine).toBe('1 running • 1 queued'); + }); + + it('keeps one terminal filing sync summary for mixed completion outcomes', () => { + const completed = makeTask({ + status: 'completed', + stage: 'completed', + stage_detail: 'Completed sync for NVDA', + result: { + ticker: 'NVDA', + fetched: 5, + inserted: 2, + updated: 1, + taxonomySnapshotsHydrated: 1, + taxonomySnapshotsFailed: 0 + }, + finished_at: '2026-03-14T10:07:00.000Z' + }); + const failed = makeTask({ + id: 'task-2', + status: 'failed', + stage: 'sync.persist_filings', + stage_detail: 'Persist failed for MSFT', + error: 'Persist failed for MSFT', + resource_key: 'sync_filings:MSFT', + payload: { ticker: 'MSFT', limit: 20 }, + stage_context: { + progress: { current: 5, total: 5, unit: 'filings' }, + counters: { fetched: 5, inserted: 0, updated: 0, hydrated: 0, failed: 1 }, + subject: { ticker: 'MSFT', accessionNumber: '0000000000-26-000002' } + }, + finished_at: '2026-03-14T10:08:00.000Z', + updated_at: '2026-03-14T10:08:00.000Z' + }); + + const entry = buildNotificationEntries({ + activeTasks: [], + finishedTasks: [completed, failed], + filingSyncBatch: batchState({ + active: false, + terminalVisible: true, + finishedAt: '2026-03-14T10:08:00.000Z' + }) + }).find((candidate) => candidate.kind === 'filing_sync_batch'); + + expect(entry).toMatchObject({ + status: 'failed', + statusLine: 'Filing sync finished with issues', + detailLine: '2 tickers processed • 1 failed' + }); + }); + + it('leaves non-sync tasks ungrouped', () => { + const sync = makeTask(); + const refresh = makeTask({ + id: 'task-3', + task_type: 'refresh_prices', + resource_key: 'refresh_prices:portfolio', + payload: {}, + stage: 'refresh.fetch_quotes', + stage_detail: 'Fetching quotes', + stage_context: { + progress: { current: 3, total: 4, unit: 'tickers' }, + counters: { updatedCount: 2, holdings: 4 }, + subject: { ticker: 'NVDA' } + } + }); + + const entries = buildNotificationEntries({ + activeTasks: [sync, refresh], + finishedTasks: [], + filingSyncBatch: batchState({ taskIds: ['task-1'] }) + }); + + expect(entries.filter((entry) => entry.kind === 'single')).toHaveLength(1); + expect(entries.some((entry) => entry.id === 'task-3')).toBe(true); + expect(entries.some((entry) => entry.kind === 'filing_sync_batch')).toBe(true); + }); + + it('ignores noisy filing sync detail-only changes in the aggregate signature', () => { + const first = makeTask(); + const second = makeTask({ + id: 'task-2', + resource_key: 'sync_filings:MSFT', + payload: { ticker: 'MSFT', limit: 20 }, + stage_detail: 'Extracting taxonomy for MSFT', + stage_context: { + progress: { current: 2, total: 5, unit: 'filings' }, + counters: { fetched: 4, inserted: 1, updated: 2, hydrated: 0, failed: 0 }, + subject: { ticker: 'MSFT', accessionNumber: '0000000000-26-000002' } + }, + updated_at: '2026-03-14T10:06:00.000Z' + }); + + const original = buildNotificationEntries({ + activeTasks: [first, second], + finishedTasks: [], + filingSyncBatch: batchState() + }).find((entry) => entry.kind === 'filing_sync_batch'); + + const { notification, ...secondCore } = second; + void notification; + const noisyUpdate = makeTask({ + ...secondCore, + stage_detail: 'Extracting taxonomy for MSFT filing 2/5' + }); + const updated = buildNotificationEntries({ + activeTasks: [first, noisyUpdate], + finishedTasks: [], + filingSyncBatch: batchState() + }).find((entry) => entry.kind === 'filing_sync_batch'); + + expect(original).toBeTruthy(); + expect(updated).toBeTruthy(); + expect(notificationEntrySignature(original!)).toBe(notificationEntrySignature(updated!)); + }); +}); diff --git a/lib/task-notification-entries.ts b/lib/task-notification-entries.ts new file mode 100644 index 0000000..51526d0 --- /dev/null +++ b/lib/task-notification-entries.ts @@ -0,0 +1,326 @@ +import type { + Task, + TaskNotificationAction, + TaskNotificationEntry, + TaskNotificationStat +} from '@/lib/types'; + +const FILING_SYNC_ENTRY_ID = 'filing-sync:active'; +const SYNC_STAT_LABELS = ['Fetched', 'Inserted', 'Updated', 'Hydrated', 'Failed'] as const; + +export type FilingSyncBatchState = { + active: boolean; + taskIds: string[]; + latestTaskId: string | null; + startedAt: string | null; + finishedAt: string | null; + terminalVisible: boolean; +}; + +export const EMPTY_FILING_SYNC_BATCH_STATE: FilingSyncBatchState = { + active: false, + taskIds: [], + latestTaskId: null, + startedAt: null, + finishedAt: null, + terminalVisible: false +}; + +function asRecord(value: unknown) { + return value && typeof value === 'object' && !Array.isArray(value) + ? value as Record + : null; +} + +function asString(value: unknown) { + return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null; +} + +function formatInteger(value: number) { + return new Intl.NumberFormat('en-US', { maximumFractionDigits: 0 }).format(value); +} + +function parseInteger(value: string) { + const parsed = Number(value.replace(/,/g, '')); + return Number.isFinite(parsed) ? parsed : null; +} + +function isTerminalTask(task: Task) { + return task.status === 'completed' || task.status === 'failed'; +} + +function isSyncTask(task: Task) { + return task.task_type === 'sync_filings'; +} + +function latestTask(tasks: Task[]) { + return [...tasks].sort((left, right) => ( + new Date(right.updated_at).getTime() - new Date(left.updated_at).getTime() + ))[0] ?? null; +} + +function taskTicker(task: Task) { + const payload = asRecord(task.payload); + const result = asRecord(task.result); + + if (typeof task.stage_context?.subject?.ticker === 'string' && task.stage_context.subject.ticker.trim().length > 0) { + return task.stage_context.subject.ticker.trim().toUpperCase(); + } + + return ( + asString(result?.ticker) + ?? asString(payload?.ticker) + ?? (task.resource_key?.startsWith('sync_filings:') ? task.resource_key.slice('sync_filings:'.length) : null) + )?.toUpperCase() ?? null; +} + +function taskNotificationEntry(task: Task): TaskNotificationEntry { + return { + id: task.id, + kind: 'single', + status: task.status, + title: task.notification.title, + statusLine: task.notification.statusLine, + detailLine: task.notification.detailLine, + progress: task.notification.progress, + stats: task.notification.stats, + updatedAt: task.updated_at, + primaryTaskId: task.id, + taskIds: [task.id], + actions: task.notification.actions, + notificationReadAt: task.notification_read_at, + notificationSilencedAt: task.notification_silenced_at + }; +} + +function sumStats(tasks: Task[]) { + return SYNC_STAT_LABELS.flatMap((label) => { + let seen = false; + let total = 0; + + for (const task of tasks) { + const stat = task.notification.stats.find((entry) => entry.label === label); + if (!stat) { + continue; + } + + const parsed = parseInteger(stat.value); + if (parsed === null) { + continue; + } + + seen = true; + total += parsed; + } + + return seen ? [{ label, value: formatInteger(total) } satisfies TaskNotificationStat] : []; + }); +} + +function aggregateProgress(tasks: Task[]) { + const progressEntries = tasks + .map((task) => task.notification.progress) + .filter((progress): progress is NonNullable => Boolean(progress)); + + if (progressEntries.length === 0) { + return null; + } + + const unit = progressEntries[0]?.unit ?? null; + if (!unit || progressEntries.some((progress) => progress.unit !== unit)) { + return null; + } + + const current = progressEntries.reduce((sum, progress) => sum + progress.current, 0); + const total = progressEntries.reduce((sum, progress) => sum + progress.total, 0); + const percent = total > 0 ? Math.min(100, Math.max(0, Math.round((current / total) * 100))) : null; + + return { current, total, unit, percent }; +} + +function buildFilingActions(tasks: Task[]): TaskNotificationAction[] { + const tickers = [...new Set(tasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))]; + const href = tickers.length === 1 + ? `/filings?ticker=${encodeURIComponent(tickers[0] ?? '')}` + : '/filings'; + + return [ + { + id: 'open_filings', + label: 'Open filings', + href, + primary: true + }, + { + id: 'open_details', + label: 'Open details', + href: null + } + ]; +} + +function aggregateNotificationState(tasks: Task[]) { + if (tasks.length === 0) { + return { + notificationReadAt: null, + notificationSilencedAt: null + }; + } + + return { + notificationReadAt: tasks.every((task) => task.notification_read_at !== null) + ? latestTask(tasks)?.notification_read_at ?? null + : null, + notificationSilencedAt: tasks.every((task) => task.notification_silenced_at !== null) + ? latestTask(tasks)?.notification_silenced_at ?? null + : null + }; +} + +function buildActiveFilingSyncEntry(activeTasks: Task[], memberTasks: Task[], batch: FilingSyncBatchState) { + const sourceTasks = memberTasks.length > 0 ? memberTasks : activeTasks; + const latest = latestTask(activeTasks) ?? latestTask(sourceTasks); + if (!latest) { + return null; + } + + const tickers = [...new Set(sourceTasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))]; + const runningCount = activeTasks.filter((task) => task.status === 'running').length; + const queuedCount = activeTasks.filter((task) => task.status === 'queued').length; + const notificationState = aggregateNotificationState(sourceTasks); + + return { + id: FILING_SYNC_ENTRY_ID, + kind: 'filing_sync_batch', + status: runningCount > 0 ? 'running' : 'queued', + title: 'Filing sync', + statusLine: `Syncing filings for ${tickers.length || activeTasks.length} ${tickers.length === 1 || activeTasks.length === 1 ? 'ticker' : 'tickers'}`, + detailLine: tickers.length === 1 && latest.notification.detailLine + ? latest.notification.detailLine + : `${runningCount} running • ${queuedCount} queued`, + progress: aggregateProgress(sourceTasks), + stats: sumStats(sourceTasks), + updatedAt: latest.updated_at, + primaryTaskId: batch.latestTaskId ?? latest.id, + taskIds: batch.taskIds.length > 0 ? batch.taskIds : sourceTasks.map((task) => task.id), + actions: buildFilingActions(sourceTasks), + notificationReadAt: notificationState.notificationReadAt, + notificationSilencedAt: notificationState.notificationSilencedAt, + meta: { + tickerCount: tickers.length || activeTasks.length, + runningCount, + queuedCount, + failureCount: 0 + } + } satisfies TaskNotificationEntry; +} + +function buildTerminalFilingSyncEntry(tasks: Task[], batch: FilingSyncBatchState) { + if (!batch.terminalVisible || tasks.length === 0) { + return null; + } + + const latest = latestTask(tasks); + if (!latest) { + return null; + } + + const tickers = [...new Set(tasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))]; + const failureCount = tasks.filter((task) => task.status === 'failed').length; + const notificationState = aggregateNotificationState(tasks); + + return { + id: FILING_SYNC_ENTRY_ID, + kind: 'filing_sync_batch', + status: failureCount > 0 ? 'failed' : 'completed', + title: 'Filing sync', + statusLine: failureCount > 0 ? 'Filing sync finished with issues' : 'Finished syncing filings', + detailLine: `${tickers.length || tasks.length} ${tickers.length === 1 || tasks.length === 1 ? 'ticker' : 'tickers'} processed${failureCount > 0 ? ` • ${failureCount} failed` : ''}`, + progress: aggregateProgress(tasks), + stats: sumStats(tasks), + updatedAt: batch.finishedAt ?? latest.updated_at, + primaryTaskId: batch.latestTaskId ?? latest.id, + taskIds: batch.taskIds.length > 0 ? batch.taskIds : tasks.map((task) => task.id), + actions: buildFilingActions(tasks), + notificationReadAt: notificationState.notificationReadAt, + notificationSilencedAt: notificationState.notificationSilencedAt, + meta: { + tickerCount: tickers.length || tasks.length, + runningCount: 0, + queuedCount: 0, + failureCount + } + } satisfies TaskNotificationEntry; +} + +export function buildNotificationEntries(input: { + activeTasks: Task[]; + finishedTasks: Task[]; + filingSyncBatch: FilingSyncBatchState; +}) { + const entries: TaskNotificationEntry[] = []; + const batchTaskIds = new Set(input.filingSyncBatch.taskIds); + + for (const task of input.activeTasks) { + if (!isSyncTask(task)) { + entries.push(taskNotificationEntry(task)); + } + } + + for (const task of input.finishedTasks) { + if (!isSyncTask(task)) { + entries.push(taskNotificationEntry(task)); + } + } + + const activeSyncTasks = input.activeTasks.filter(isSyncTask); + const knownBatchSyncTasks = batchTaskIds.size > 0 + ? [...input.activeTasks, ...input.finishedTasks].filter((task) => isSyncTask(task) && batchTaskIds.has(task.id)) + : activeSyncTasks; + + const filingEntry = activeSyncTasks.length > 0 + ? buildActiveFilingSyncEntry(activeSyncTasks, knownBatchSyncTasks, input.filingSyncBatch) + : buildTerminalFilingSyncEntry(knownBatchSyncTasks.filter(isTerminalTask), input.filingSyncBatch); + + if (filingEntry) { + entries.push(filingEntry); + } + + return entries.sort((left, right) => ( + new Date(right.updatedAt).getTime() - new Date(left.updatedAt).getTime() + )); +} + +export function isFilingSyncEntry(entry: TaskNotificationEntry) { + return entry.kind === 'filing_sync_batch'; +} + +export function notificationEntrySignature(entry: TaskNotificationEntry) { + if (!isFilingSyncEntry(entry)) { + return JSON.stringify({ + kind: entry.kind, + status: entry.status, + statusLine: entry.statusLine, + detailLine: entry.detailLine, + progress: entry.progress, + stats: entry.stats, + primaryTaskId: entry.primaryTaskId + }); + } + + const progressBucket = entry.progress?.percent === null || entry.progress?.percent === undefined + ? null + : Math.floor(entry.progress.percent / 10) * 10; + const primaryAction = entry.actions.find((action) => action.primary && action.href) ?? null; + + return JSON.stringify({ + kind: entry.kind, + status: entry.status, + progressBucket, + runningCount: entry.meta?.runningCount ?? 0, + queuedCount: entry.meta?.queuedCount ?? 0, + failureCount: entry.meta?.failureCount ?? 0, + primaryTaskId: entry.primaryTaskId, + primaryHref: primaryAction?.href ?? null + }); +} diff --git a/lib/types.ts b/lib/types.ts index 717e8b1..4935231 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -199,6 +199,29 @@ export type TaskNotificationView = { actions: TaskNotificationAction[]; }; +export type TaskNotificationEntry = { + id: string; + kind: 'single' | 'filing_sync_batch'; + status: TaskStatus; + title: string; + statusLine: string; + detailLine: string | null; + progress: TaskNotificationView['progress']; + stats: TaskNotificationStat[]; + updatedAt: string; + primaryTaskId: string; + taskIds: string[]; + actions: TaskNotificationAction[]; + notificationReadAt: string | null; + notificationSilencedAt: string | null; + meta?: { + tickerCount: number; + runningCount: number; + queuedCount: number; + failureCount: number; + }; +}; + export type Task = { id: string; user_id: string;