327 lines
10 KiB
TypeScript
327 lines
10 KiB
TypeScript
import type {
|
|
Task,
|
|
TaskNotificationAction,
|
|
TaskNotificationEntry,
|
|
TaskNotificationStat
|
|
} from '@/lib/types';
|
|
|
|
const FILING_SYNC_ENTRY_ID = 'filing-sync:active';
|
|
const SYNC_STAT_LABELS = ['Fetched', 'Inserted', 'Updated', 'Hydrated', 'Failed'] as const;
|
|
|
|
export type FilingSyncBatchState = {
|
|
active: boolean;
|
|
taskIds: string[];
|
|
latestTaskId: string | null;
|
|
startedAt: string | null;
|
|
finishedAt: string | null;
|
|
terminalVisible: boolean;
|
|
};
|
|
|
|
export const EMPTY_FILING_SYNC_BATCH_STATE: FilingSyncBatchState = {
|
|
active: false,
|
|
taskIds: [],
|
|
latestTaskId: null,
|
|
startedAt: null,
|
|
finishedAt: null,
|
|
terminalVisible: false
|
|
};
|
|
|
|
function asRecord(value: unknown) {
|
|
return value && typeof value === 'object' && !Array.isArray(value)
|
|
? value as Record<string, unknown>
|
|
: null;
|
|
}
|
|
|
|
function asString(value: unknown) {
|
|
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null;
|
|
}
|
|
|
|
function formatInteger(value: number) {
|
|
return new Intl.NumberFormat('en-US', { maximumFractionDigits: 0 }).format(value);
|
|
}
|
|
|
|
function parseInteger(value: string) {
|
|
const parsed = Number(value.replace(/,/g, ''));
|
|
return Number.isFinite(parsed) ? parsed : null;
|
|
}
|
|
|
|
function isTerminalTask(task: Task) {
|
|
return task.status === 'completed' || task.status === 'failed';
|
|
}
|
|
|
|
function isSyncTask(task: Task) {
|
|
return task.task_type === 'sync_filings';
|
|
}
|
|
|
|
function latestTask(tasks: Task[]) {
|
|
return [...tasks].sort((left, right) => (
|
|
new Date(right.updated_at).getTime() - new Date(left.updated_at).getTime()
|
|
))[0] ?? null;
|
|
}
|
|
|
|
function taskTicker(task: Task) {
|
|
const payload = asRecord(task.payload);
|
|
const result = asRecord(task.result);
|
|
|
|
if (typeof task.stage_context?.subject?.ticker === 'string' && task.stage_context.subject.ticker.trim().length > 0) {
|
|
return task.stage_context.subject.ticker.trim().toUpperCase();
|
|
}
|
|
|
|
return (
|
|
asString(result?.ticker)
|
|
?? asString(payload?.ticker)
|
|
?? (task.resource_key?.startsWith('sync_filings:') ? task.resource_key.slice('sync_filings:'.length) : null)
|
|
)?.toUpperCase() ?? null;
|
|
}
|
|
|
|
function taskNotificationEntry(task: Task): TaskNotificationEntry {
|
|
return {
|
|
id: task.id,
|
|
kind: 'single',
|
|
status: task.status,
|
|
title: task.notification.title,
|
|
statusLine: task.notification.statusLine,
|
|
detailLine: task.notification.detailLine,
|
|
progress: task.notification.progress,
|
|
stats: task.notification.stats,
|
|
updatedAt: task.updated_at,
|
|
primaryTaskId: task.id,
|
|
taskIds: [task.id],
|
|
actions: task.notification.actions,
|
|
notificationReadAt: task.notification_read_at,
|
|
notificationSilencedAt: task.notification_silenced_at
|
|
};
|
|
}
|
|
|
|
function sumStats(tasks: Task[]) {
|
|
return SYNC_STAT_LABELS.flatMap((label) => {
|
|
let seen = false;
|
|
let total = 0;
|
|
|
|
for (const task of tasks) {
|
|
const stat = task.notification.stats.find((entry) => entry.label === label);
|
|
if (!stat) {
|
|
continue;
|
|
}
|
|
|
|
const parsed = parseInteger(stat.value);
|
|
if (parsed === null) {
|
|
continue;
|
|
}
|
|
|
|
seen = true;
|
|
total += parsed;
|
|
}
|
|
|
|
return seen ? [{ label, value: formatInteger(total) } satisfies TaskNotificationStat] : [];
|
|
});
|
|
}
|
|
|
|
function aggregateProgress(tasks: Task[]) {
|
|
const progressEntries = tasks
|
|
.map((task) => task.notification.progress)
|
|
.filter((progress): progress is NonNullable<Task['notification']['progress']> => Boolean(progress));
|
|
|
|
if (progressEntries.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
const unit = progressEntries[0]?.unit ?? null;
|
|
if (!unit || progressEntries.some((progress) => progress.unit !== unit)) {
|
|
return null;
|
|
}
|
|
|
|
const current = progressEntries.reduce((sum, progress) => sum + progress.current, 0);
|
|
const total = progressEntries.reduce((sum, progress) => sum + progress.total, 0);
|
|
const percent = total > 0 ? Math.min(100, Math.max(0, Math.round((current / total) * 100))) : null;
|
|
|
|
return { current, total, unit, percent };
|
|
}
|
|
|
|
function buildFilingActions(tasks: Task[]): TaskNotificationAction[] {
|
|
const tickers = [...new Set(tasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))];
|
|
const href = tickers.length === 1
|
|
? `/filings?ticker=${encodeURIComponent(tickers[0] ?? '')}`
|
|
: '/filings';
|
|
|
|
return [
|
|
{
|
|
id: 'open_filings',
|
|
label: 'Open filings',
|
|
href,
|
|
primary: true
|
|
},
|
|
{
|
|
id: 'open_details',
|
|
label: 'Open details',
|
|
href: null
|
|
}
|
|
];
|
|
}
|
|
|
|
function aggregateNotificationState(tasks: Task[]) {
|
|
if (tasks.length === 0) {
|
|
return {
|
|
notificationReadAt: null,
|
|
notificationSilencedAt: null
|
|
};
|
|
}
|
|
|
|
return {
|
|
notificationReadAt: tasks.every((task) => task.notification_read_at !== null)
|
|
? latestTask(tasks)?.notification_read_at ?? null
|
|
: null,
|
|
notificationSilencedAt: tasks.every((task) => task.notification_silenced_at !== null)
|
|
? latestTask(tasks)?.notification_silenced_at ?? null
|
|
: null
|
|
};
|
|
}
|
|
|
|
function buildActiveFilingSyncEntry(activeTasks: Task[], memberTasks: Task[], batch: FilingSyncBatchState) {
|
|
const sourceTasks = memberTasks.length > 0 ? memberTasks : activeTasks;
|
|
const latest = latestTask(activeTasks) ?? latestTask(sourceTasks);
|
|
if (!latest) {
|
|
return null;
|
|
}
|
|
|
|
const tickers = [...new Set(sourceTasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))];
|
|
const runningCount = activeTasks.filter((task) => task.status === 'running').length;
|
|
const queuedCount = activeTasks.filter((task) => task.status === 'queued').length;
|
|
const notificationState = aggregateNotificationState(sourceTasks);
|
|
|
|
return {
|
|
id: FILING_SYNC_ENTRY_ID,
|
|
kind: 'filing_sync_batch',
|
|
status: runningCount > 0 ? 'running' : 'queued',
|
|
title: 'Filing sync',
|
|
statusLine: `Syncing filings for ${tickers.length || activeTasks.length} ${tickers.length === 1 || activeTasks.length === 1 ? 'ticker' : 'tickers'}`,
|
|
detailLine: tickers.length === 1 && latest.notification.detailLine
|
|
? latest.notification.detailLine
|
|
: `${runningCount} running • ${queuedCount} queued`,
|
|
progress: aggregateProgress(sourceTasks),
|
|
stats: sumStats(sourceTasks),
|
|
updatedAt: latest.updated_at,
|
|
primaryTaskId: batch.latestTaskId ?? latest.id,
|
|
taskIds: batch.taskIds.length > 0 ? batch.taskIds : sourceTasks.map((task) => task.id),
|
|
actions: buildFilingActions(sourceTasks),
|
|
notificationReadAt: notificationState.notificationReadAt,
|
|
notificationSilencedAt: notificationState.notificationSilencedAt,
|
|
meta: {
|
|
tickerCount: tickers.length || activeTasks.length,
|
|
runningCount,
|
|
queuedCount,
|
|
failureCount: 0
|
|
}
|
|
} satisfies TaskNotificationEntry;
|
|
}
|
|
|
|
function buildTerminalFilingSyncEntry(tasks: Task[], batch: FilingSyncBatchState) {
|
|
if (!batch.terminalVisible || tasks.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
const latest = latestTask(tasks);
|
|
if (!latest) {
|
|
return null;
|
|
}
|
|
|
|
const tickers = [...new Set(tasks.map((task) => taskTicker(task)).filter((ticker): ticker is string => Boolean(ticker)))];
|
|
const failureCount = tasks.filter((task) => task.status === 'failed').length;
|
|
const notificationState = aggregateNotificationState(tasks);
|
|
|
|
return {
|
|
id: FILING_SYNC_ENTRY_ID,
|
|
kind: 'filing_sync_batch',
|
|
status: failureCount > 0 ? 'failed' : 'completed',
|
|
title: 'Filing sync',
|
|
statusLine: failureCount > 0 ? 'Filing sync finished with issues' : 'Finished syncing filings',
|
|
detailLine: `${tickers.length || tasks.length} ${tickers.length === 1 || tasks.length === 1 ? 'ticker' : 'tickers'} processed${failureCount > 0 ? ` • ${failureCount} failed` : ''}`,
|
|
progress: aggregateProgress(tasks),
|
|
stats: sumStats(tasks),
|
|
updatedAt: batch.finishedAt ?? latest.updated_at,
|
|
primaryTaskId: batch.latestTaskId ?? latest.id,
|
|
taskIds: batch.taskIds.length > 0 ? batch.taskIds : tasks.map((task) => task.id),
|
|
actions: buildFilingActions(tasks),
|
|
notificationReadAt: notificationState.notificationReadAt,
|
|
notificationSilencedAt: notificationState.notificationSilencedAt,
|
|
meta: {
|
|
tickerCount: tickers.length || tasks.length,
|
|
runningCount: 0,
|
|
queuedCount: 0,
|
|
failureCount
|
|
}
|
|
} satisfies TaskNotificationEntry;
|
|
}
|
|
|
|
export function buildNotificationEntries(input: {
|
|
activeTasks: Task[];
|
|
finishedTasks: Task[];
|
|
filingSyncBatch: FilingSyncBatchState;
|
|
}) {
|
|
const entries: TaskNotificationEntry[] = [];
|
|
const batchTaskIds = new Set(input.filingSyncBatch.taskIds);
|
|
|
|
for (const task of input.activeTasks) {
|
|
if (!isSyncTask(task)) {
|
|
entries.push(taskNotificationEntry(task));
|
|
}
|
|
}
|
|
|
|
for (const task of input.finishedTasks) {
|
|
if (!isSyncTask(task)) {
|
|
entries.push(taskNotificationEntry(task));
|
|
}
|
|
}
|
|
|
|
const activeSyncTasks = input.activeTasks.filter(isSyncTask);
|
|
const knownBatchSyncTasks = batchTaskIds.size > 0
|
|
? [...input.activeTasks, ...input.finishedTasks].filter((task) => isSyncTask(task) && batchTaskIds.has(task.id))
|
|
: activeSyncTasks;
|
|
|
|
const filingEntry = activeSyncTasks.length > 0
|
|
? buildActiveFilingSyncEntry(activeSyncTasks, knownBatchSyncTasks, input.filingSyncBatch)
|
|
: buildTerminalFilingSyncEntry(knownBatchSyncTasks.filter(isTerminalTask), input.filingSyncBatch);
|
|
|
|
if (filingEntry) {
|
|
entries.push(filingEntry);
|
|
}
|
|
|
|
return entries.sort((left, right) => (
|
|
new Date(right.updatedAt).getTime() - new Date(left.updatedAt).getTime()
|
|
));
|
|
}
|
|
|
|
export function isFilingSyncEntry(entry: TaskNotificationEntry) {
|
|
return entry.kind === 'filing_sync_batch';
|
|
}
|
|
|
|
export function notificationEntrySignature(entry: TaskNotificationEntry) {
|
|
if (!isFilingSyncEntry(entry)) {
|
|
return JSON.stringify({
|
|
kind: entry.kind,
|
|
status: entry.status,
|
|
statusLine: entry.statusLine,
|
|
detailLine: entry.detailLine,
|
|
progress: entry.progress,
|
|
stats: entry.stats,
|
|
primaryTaskId: entry.primaryTaskId
|
|
});
|
|
}
|
|
|
|
const progressBucket = entry.progress?.percent === null || entry.progress?.percent === undefined
|
|
? null
|
|
: Math.floor(entry.progress.percent / 10) * 10;
|
|
const primaryAction = entry.actions.find((action) => action.primary && action.href) ?? null;
|
|
|
|
return JSON.stringify({
|
|
kind: entry.kind,
|
|
status: entry.status,
|
|
progressBucket,
|
|
runningCount: entry.meta?.runningCount ?? 0,
|
|
queuedCount: entry.meta?.queuedCount ?? 0,
|
|
failureCount: entry.meta?.failureCount ?? 0,
|
|
primaryTaskId: entry.primaryTaskId,
|
|
primaryHref: primaryAction?.href ?? null
|
|
});
|
|
}
|