Add category and tags granularity to company sync flows

This commit is contained in:
2026-03-03 23:10:08 -05:00
parent 717e747869
commit 610fce8db3
12 changed files with 415 additions and 29 deletions

View File

@@ -26,6 +26,7 @@ import {
import { getLatestPortfolioInsight } from '@/lib/server/repos/insights';
import {
deleteWatchlistItemRecord,
getWatchlistItemByTicker,
listWatchlistItems,
upsertWatchlistItemRecord
} from '@/lib/server/repos/watchlist';
@@ -86,6 +87,39 @@ function asBoolean(value: unknown, fallback = false) {
return fallback;
}
function asOptionalString(value: unknown) {
if (typeof value !== 'string') {
return null;
}
const normalized = value.trim();
return normalized.length > 0 ? normalized : null;
}
function asTags(value: unknown) {
const source = Array.isArray(value)
? value
: typeof value === 'string'
? value.split(',')
: [];
const unique = new Set<string>();
for (const entry of source) {
if (typeof entry !== 'string') {
continue;
}
const tag = entry.trim();
if (!tag) {
continue;
}
unique.add(tag);
}
return [...unique];
}
function asStatementMode(value: unknown): FinancialStatementMode {
return FINANCIAL_STATEMENT_MODES.includes(value as FinancialStatementMode)
? value as FinancialStatementMode
@@ -115,15 +149,38 @@ function withFinancialMetricsPolicy(filing: Filing): Filing {
};
}
async function queueAutoFilingSync(userId: string, ticker: string) {
function buildSyncFilingsPayload(input: {
ticker: string;
limit: number;
category?: unknown;
tags?: unknown;
}) {
const category = asOptionalString(input.category);
const tags = asTags(input.tags);
return {
ticker: input.ticker,
limit: input.limit,
...(category ? { category } : {}),
...(tags.length > 0 ? { tags } : {})
};
}
async function queueAutoFilingSync(
userId: string,
ticker: string,
metadata?: { category?: unknown; tags?: unknown }
) {
try {
await enqueueTask({
userId,
taskType: 'sync_filings',
payload: {
payload: buildSyncFilingsPayload({
ticker,
limit: AUTO_FILING_SYNC_LIMIT
},
limit: AUTO_FILING_SYNC_LIMIT,
category: metadata?.category,
tags: metadata?.tags
}),
priority: 90,
resourceKey: `sync_filings:${ticker}`
});
@@ -228,7 +285,9 @@ export const app = new Elysia({ prefix: '/api' })
const payload = asRecord(body);
const ticker = typeof payload.ticker === 'string' ? payload.ticker.trim().toUpperCase() : '';
const companyName = typeof payload.companyName === 'string' ? payload.companyName.trim() : '';
const sector = typeof payload.sector === 'string' ? payload.sector.trim() : '';
const sector = asOptionalString(payload.sector) ?? '';
const category = asOptionalString(payload.category) ?? '';
const tags = asTags(payload.tags);
if (!ticker) {
return jsonError('ticker is required');
@@ -243,11 +302,16 @@ export const app = new Elysia({ prefix: '/api' })
userId: session.user.id,
ticker,
companyName,
sector
sector,
category,
tags
});
const autoFilingSyncQueued = created
? await queueAutoFilingSync(session.user.id, ticker)
? await queueAutoFilingSync(session.user.id, ticker, {
category: item.category,
tags: item.tags
})
: false;
return Response.json({ item, autoFilingSyncQueued });
@@ -258,7 +322,9 @@ export const app = new Elysia({ prefix: '/api' })
body: t.Object({
ticker: t.String({ minLength: 1 }),
companyName: t.String({ minLength: 1 }),
sector: t.Optional(t.String())
sector: t.Optional(t.String()),
category: t.Optional(t.String()),
tags: t.Optional(t.Union([t.Array(t.String()), t.String()]))
})
})
.delete('/watchlist/:id', async ({ params }) => {
@@ -524,6 +590,8 @@ export const app = new Elysia({ prefix: '/api' })
ticker,
companyName,
sector: watchlistItem?.sector ?? null,
category: watchlistItem?.category ?? null,
tags: watchlistItem?.tags ?? [],
cik: latestFiling?.cik ?? null
},
quote: liveQuote,
@@ -588,13 +656,16 @@ export const app = new Elysia({ prefix: '/api' })
if (shouldQueueSync) {
try {
const watchlistItem = await getWatchlistItemByTicker(session.user.id, ticker);
await enqueueTask({
userId: session.user.id,
taskType: 'sync_filings',
payload: {
payload: buildSyncFilingsPayload({
ticker,
limit: defaultFinancialSyncLimit(window)
},
limit: defaultFinancialSyncLimit(window),
category: watchlistItem?.category,
tags: watchlistItem?.tags
}),
priority: 88,
resourceKey: `sync_filings:${ticker}`
});
@@ -707,6 +778,8 @@ export const app = new Elysia({ prefix: '/api' })
const payload = asRecord(body);
const ticker = typeof payload.ticker === 'string' ? payload.ticker.trim().toUpperCase() : '';
const category = asOptionalString(payload.category);
const tags = asTags(payload.tags);
if (!ticker) {
return jsonError('ticker is required');
@@ -717,10 +790,12 @@ export const app = new Elysia({ prefix: '/api' })
const task = await enqueueTask({
userId: session.user.id,
taskType: 'sync_filings',
payload: {
payload: buildSyncFilingsPayload({
ticker,
limit: Number.isFinite(limit) ? limit : 20
},
limit: Number.isFinite(limit) ? limit : 20,
category,
tags
}),
priority: 90,
resourceKey: `sync_filings:${ticker}`
});
@@ -732,7 +807,9 @@ export const app = new Elysia({ prefix: '/api' })
}, {
body: t.Object({
ticker: t.String({ minLength: 1 }),
limit: t.Optional(t.Numeric())
limit: t.Optional(t.Numeric()),
category: t.Optional(t.String()),
tags: t.Optional(t.Union([t.Array(t.String()), t.String()]))
})
})
.post('/filings/:accessionNumber/analyze', async ({ params }) => {

View File

@@ -85,7 +85,8 @@ function applySqlMigrations(client: { exec: (query: string) => void }) {
'0000_cold_silver_centurion.sql',
'0001_glossy_statement_snapshots.sql',
'0002_workflow_task_projection_metadata.sql',
'0003_task_stage_event_timeline.sql'
'0003_task_stage_event_timeline.sql',
'0004_watchlist_company_taxonomy.sql'
];
for (const file of migrationFiles) {
@@ -215,6 +216,80 @@ if (process.env.RUN_TASK_WORKFLOW_E2E === '1') {
expect(tasks.every((task) => typeof task.workflow_run_id === 'string' && task.workflow_run_id.length > 0)).toBe(true);
});
it('persists watchlist category and tags and forwards them to auto sync task payload', async () => {
const created = await jsonRequest('POST', '/api/watchlist', {
ticker: 'shop',
companyName: 'Shopify Inc.',
sector: 'Technology',
category: 'core',
tags: ['growth', 'ecommerce', 'growth', ' ']
});
expect(created.response.status).toBe(200);
const createdBody = created.json as {
item: {
ticker: string;
category: string | null;
tags: string[];
};
autoFilingSyncQueued: boolean;
};
expect(createdBody.item.ticker).toBe('SHOP');
expect(createdBody.item.category).toBe('core');
expect(createdBody.item.tags).toEqual(['growth', 'ecommerce']);
expect(createdBody.autoFilingSyncQueued).toBe(true);
const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=5');
expect(tasksResponse.response.status).toBe(200);
const task = (tasksResponse.json as {
tasks: Array<{
task_type: string;
payload: {
ticker?: string;
category?: string;
tags?: string[];
limit?: number;
};
}>;
}).tasks.find((entry) => entry.task_type === 'sync_filings');
expect(task).toBeTruthy();
expect(task?.payload.ticker).toBe('SHOP');
expect(task?.payload.limit).toBe(20);
expect(task?.payload.category).toBe('core');
expect(task?.payload.tags).toEqual(['growth', 'ecommerce']);
});
it('accepts category and comma-separated tags on manual filings sync payload', async () => {
const sync = await jsonRequest('POST', '/api/filings/sync', {
ticker: 'nvda',
limit: 15,
category: 'watch',
tags: 'semis, ai, semis'
});
expect(sync.response.status).toBe(200);
const task = (sync.json as {
task: {
task_type: string;
payload: {
ticker: string;
limit: number;
category?: string;
tags?: string[];
};
};
}).task;
expect(task.task_type).toBe('sync_filings');
expect(task.payload.ticker).toBe('NVDA');
expect(task.payload.limit).toBe(15);
expect(task.payload.category).toBe('watch');
expect(task.payload.tags).toEqual(['semis', 'ai']);
});
it('updates notification read and silenced state via patch endpoint', async () => {
const created = await jsonRequest('POST', '/api/filings/0000000000-26-000010/analyze');
const taskId = (created.json as { task: { id: string } }).task.id;

View File

@@ -204,6 +204,8 @@ export const watchlistItem = sqliteTable('watchlist_item', {
ticker: text('ticker').notNull(),
company_name: text('company_name').notNull(),
sector: text('sector'),
category: text('category'),
tags: text('tags', { mode: 'json' }).$type<string[]>(),
created_at: text('created_at').notNull()
}, (table) => ({
watchlistUserTickerUnique: uniqueIndex('watchlist_user_ticker_uidx').on(table.user_id, table.ticker),

View File

@@ -5,6 +5,33 @@ import { watchlistItem } from '@/lib/server/db/schema';
type WatchlistRow = typeof watchlistItem.$inferSelect;
function normalizeTags(tags?: string[]) {
if (!Array.isArray(tags)) {
return null;
}
const unique = new Set<string>();
for (const entry of tags) {
if (typeof entry !== 'string') {
continue;
}
const tag = entry.trim();
if (!tag) {
continue;
}
unique.add(tag);
}
if (unique.size === 0) {
return null;
}
return [...unique];
}
function toWatchlistItem(row: WatchlistRow): WatchlistItem {
return {
id: row.id,
@@ -12,6 +39,10 @@ function toWatchlistItem(row: WatchlistRow): WatchlistItem {
ticker: row.ticker,
company_name: row.company_name,
sector: row.sector,
category: row.category,
tags: Array.isArray(row.tags)
? row.tags.filter((entry): entry is string => typeof entry === 'string')
: [],
created_at: row.created_at
};
}
@@ -26,14 +57,33 @@ export async function listWatchlistItems(userId: string) {
return rows.map(toWatchlistItem);
}
export async function getWatchlistItemByTicker(userId: string, ticker: string) {
const normalizedTicker = ticker.trim().toUpperCase();
if (!normalizedTicker) {
return null;
}
const [row] = await db
.select()
.from(watchlistItem)
.where(and(eq(watchlistItem.user_id, userId), eq(watchlistItem.ticker, normalizedTicker)))
.limit(1);
return row ? toWatchlistItem(row) : null;
}
export async function upsertWatchlistItemRecord(input: {
userId: string;
ticker: string;
companyName: string;
sector?: string;
category?: string;
tags?: string[];
}) {
const normalizedTicker = input.ticker.trim().toUpperCase();
const normalizedSector = input.sector?.trim() ? input.sector.trim() : null;
const normalizedCategory = input.category?.trim() ? input.category.trim() : null;
const normalizedTags = normalizeTags(input.tags);
const now = new Date().toISOString();
const [inserted] = await db
@@ -43,6 +93,8 @@ export async function upsertWatchlistItemRecord(input: {
ticker: normalizedTicker,
company_name: input.companyName,
sector: normalizedSector,
category: normalizedCategory,
tags: normalizedTags,
created_at: now
})
.onConflictDoNothing({
@@ -61,7 +113,9 @@ export async function upsertWatchlistItemRecord(input: {
.update(watchlistItem)
.set({
company_name: input.companyName,
sector: normalizedSector
sector: normalizedSector,
category: normalizedCategory,
tags: normalizedTags
})
.where(and(eq(watchlistItem.user_id, input.userId), eq(watchlistItem.ticker, normalizedTicker)))
.returning();

View File

@@ -155,6 +155,37 @@ function parseLimit(raw: unknown, fallback: number, min: number, max: number) {
return Math.min(Math.max(intValue, min), max);
}
function parseOptionalText(raw: unknown) {
if (typeof raw !== 'string') {
return null;
}
const normalized = raw.trim();
return normalized.length > 0 ? normalized : null;
}
function parseTags(raw: unknown) {
if (!Array.isArray(raw)) {
return [];
}
const unique = new Set<string>();
for (const entry of raw) {
if (typeof entry !== 'string') {
continue;
}
const tag = entry.trim();
if (!tag) {
continue;
}
unique.add(tag);
}
return [...unique];
}
function sanitizeExtractionText(value: unknown, maxLength: number) {
if (typeof value !== 'string') {
return null;
@@ -519,8 +550,20 @@ function filingLinks(filing: {
async function processSyncFilings(task: Task) {
const ticker = parseTicker(task.payload.ticker);
const limit = parseLimit(task.payload.limit, 20, 1, 50);
const category = parseOptionalText(task.payload.category);
const tags = parseTags(task.payload.tags);
const scopeLabel = [
category,
tags.length > 0 ? `tags: ${tags.join(', ')}` : null
]
.filter((entry): entry is string => Boolean(entry))
.join(' | ');
await setProjectionStage(task, 'sync.fetch_filings', `Fetching up to ${limit} filings for ${ticker}`);
await setProjectionStage(
task,
'sync.fetch_filings',
`Fetching up to ${limit} filings for ${ticker}${scopeLabel ? ` (${scopeLabel})` : ''}`
);
const filings = await fetchRecentFilings(ticker, limit);
const metricsByAccession = new Map<string, Filing['metrics']>();
const filingsByCik = new Map<string, typeof filings>();
@@ -631,6 +674,8 @@ async function processSyncFilings(task: Task) {
return {
ticker,
category,
tags,
fetched: filings.length,
inserted: saveResult.inserted,
updated: saveResult.updated,