diff --git a/.env.example b/.env.example index 5b888fb..788f01a 100644 --- a/.env.example +++ b/.env.example @@ -28,7 +28,12 @@ OLLAMA_API_KEY=ollama # SEC API etiquette SEC_USER_AGENT=Fiscal Clone -# Workflow runtime (Local world) -WORKFLOW_TARGET_WORLD=local +# Workflow runtime (Coolify / production) +WORKFLOW_TARGET_WORLD=@workflow/world-postgres +WORKFLOW_POSTGRES_URL=postgres://workflow:workflow@workflow-postgres:5432/workflow +WORKFLOW_POSTGRES_WORKER_CONCURRENCY=10 +WORKFLOW_POSTGRES_JOB_PREFIX=fiscal_ + +# Optional local-world fallback for rollback/testing WORKFLOW_LOCAL_DATA_DIR=.workflow-data WORKFLOW_LOCAL_QUEUE_CONCURRENCY=100 diff --git a/Dockerfile b/Dockerfile index 08314d5..997721f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,10 +9,10 @@ ARG DATABASE_URL=file:data/fiscal.sqlite ENV NEXT_PUBLIC_API_URL=${NEXT_PUBLIC_API_URL} ENV DATABASE_URL=${DATABASE_URL} ENV NEXT_TELEMETRY_DISABLED=1 -ENV WORKFLOW_TARGET_WORLD=local +ENV WORKFLOW_TARGET_WORLD=@workflow/world-postgres ENV WORKFLOW_LOCAL_DATA_DIR=/app/.workflow-data COPY . . -RUN mkdir -p public /app/.workflow-data && WORKFLOW_TARGET_WORLD=local bun run build +RUN mkdir -p public /app/.workflow-data && bun run build FROM oven/bun:1.3.5-alpine AS runner WORKDIR /app @@ -21,7 +21,7 @@ ENV NODE_ENV=production ARG NEXT_PUBLIC_API_URL= ENV NEXT_PUBLIC_API_URL=${NEXT_PUBLIC_API_URL} ENV NEXT_TELEMETRY_DISABLED=1 -ENV WORKFLOW_TARGET_WORLD=local +ENV WORKFLOW_TARGET_WORLD=@workflow/world-postgres ENV WORKFLOW_LOCAL_DATA_DIR=/app/.workflow-data COPY --from=builder /app/public ./public @@ -42,4 +42,4 @@ EXPOSE 3000 ENV PORT=3000 -CMD ["sh", "-c", "./node_modules/.bin/drizzle-kit migrate --config /app/drizzle.config.ts && bun server.js"] +CMD ["sh", "-c", "if [ \"$WORKFLOW_TARGET_WORLD\" = \"@workflow/world-postgres\" ]; then ./node_modules/.bin/workflow-postgres-setup; fi && ./node_modules/.bin/drizzle-kit migrate --config /app/drizzle.config.ts && bun server.js"] diff --git a/README.md b/README.md index b2a4bc3..87daf5a 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ Turbopack-first rebuild of a fiscal.ai-style terminal with Vercel AI SDK integra - Drizzle ORM (SQLite) + Better Auth Drizzle adapter - Internal API routes via Elysia app module (`lib/server/api/app.ts`) - Eden Treaty for type-safe frontend API calls -- Workflow DevKit Local World for background task execution -- SQLite-backed domain storage (watchlist, holdings, filings, tasks, insights) +- Workflow DevKit Postgres World for background task execution durability +- SQLite-backed app domain storage (watchlist, holdings, filings, task projection, insights) - Vercel AI SDK (`ai`) with dual-model routing: - Ollama (`@ai-sdk/openai`) for lightweight filing extraction/parsing - Zhipu (`zhipu-ai-provider`) for heavyweight narrative reports (`https://api.z.ai/api/coding/paas/v4`) @@ -51,9 +51,11 @@ The app calls Zhipu directly via AI SDK for heavy reports and calls Ollama for l When running in Docker and Ollama runs on the host, set `OLLAMA_BASE_URL=http://host.docker.internal:11434`. Zhipu always targets the Coding API endpoint (`https://api.z.ai/api/coding/paas/v4`). On container startup, the app applies Drizzle migrations automatically before launching Next.js. -The app stores SQLite data in Docker volume `fiscal_sqlite_data` (mounted to `/app/data`) and workflow local data in `fiscal_workflow_data` (mounted to `/app/.workflow-data`). - -Workflow Local World uses filesystem state plus an in-memory queue. On container restart, queued in-flight jobs may be lost. +The app stores SQLite data in Docker volume `fiscal_sqlite_data` (mounted to `/app/data`) and workflow world data in Postgres volume `workflow_postgres_data`. +Container startup runs: +1. `workflow-postgres-setup` (idempotent Workflow world bootstrap) +2. Drizzle migrations for SQLite app tables +3. Next.js server boot Docker images use Bun (`oven/bun:1.3.5-alpine`) for build and runtime. @@ -67,16 +69,23 @@ Required environment variables in Coolify: - `BETTER_AUTH_SECRET=` - `BETTER_AUTH_BASE_URL=https://fiscal.b11studio.xyz` - `BETTER_AUTH_TRUSTED_ORIGINS=https://fiscal.b11studio.xyz` -- `WORKFLOW_TARGET_WORLD=local` -- Optional: `WORKFLOW_LOCAL_DATA_DIR=/app/.workflow-data` +- `WORKFLOW_TARGET_WORLD=@workflow/world-postgres` +- `WORKFLOW_POSTGRES_URL=postgres://workflow:workflow@workflow-postgres:5432/workflow` +- Optional: `WORKFLOW_POSTGRES_WORKER_CONCURRENCY=10` +- Optional: `WORKFLOW_POSTGRES_JOB_PREFIX=fiscal_` Operational constraints for Coolify: - Keep this service to a single instance/replica. SQLite is file-based and not appropriate for multi-replica shared-write deployments. -- Ensure the two named volumes are persisted (`fiscal_sqlite_data`, `fiscal_workflow_data`). -- Workflow Local queueing is in-memory; in-flight queued jobs may be lost on restarts. -- Docker build forces `WORKFLOW_TARGET_WORLD=local` to avoid stale Coolify build args referencing `@workflow/world-postgres`. -- Runtime Compose config also pins `WORKFLOW_TARGET_WORLD=local` for the same reason. +- Ensure both named volumes are persisted (`fiscal_sqlite_data`, `workflow_postgres_data`). +- Keep `WORKFLOW_POSTGRES_URL` explicit so Workflow does not fall back to `DATABASE_URL` (SQLite). +- The app `/api/health` probes Workflow backend connectivity and returns non-200 when Workflow world is unavailable. + +Emergency rollback path: + +1. Set `WORKFLOW_TARGET_WORLD=local` +2. Remove/disable `WORKFLOW_POSTGRES_URL` +3. Redeploy ## Environment @@ -100,7 +109,12 @@ OLLAMA_MODEL=qwen3:8b OLLAMA_API_KEY=ollama SEC_USER_AGENT=Fiscal Clone -WORKFLOW_TARGET_WORLD=local +WORKFLOW_TARGET_WORLD=@workflow/world-postgres +WORKFLOW_POSTGRES_URL=postgres://workflow:workflow@workflow-postgres:5432/workflow +WORKFLOW_POSTGRES_WORKER_CONCURRENCY=10 +WORKFLOW_POSTGRES_JOB_PREFIX=fiscal_ + +# Optional local-world fallback WORKFLOW_LOCAL_DATA_DIR=.workflow-data WORKFLOW_LOCAL_QUEUE_CONCURRENCY=100 ``` diff --git a/app/filings/page.tsx b/app/filings/page.tsx index 5be0d26..6da6e94 100644 --- a/app/filings/page.tsx +++ b/app/filings/page.tsx @@ -11,15 +11,13 @@ import { AppShell } from '@/components/shell/app-shell'; import { Panel } from '@/components/ui/panel'; import { Button } from '@/components/ui/button'; import { Input } from '@/components/ui/input'; -import { StatusPill } from '@/components/ui/status-pill'; import { useAuthGuard } from '@/hooks/use-auth-guard'; import { useLinkPrefetch } from '@/hooks/use-link-prefetch'; -import { useTaskPoller } from '@/hooks/use-task-poller'; import { queueFilingAnalysis, queueFilingSync } from '@/lib/api'; -import type { Filing, Task } from '@/lib/types'; +import type { Filing } from '@/lib/types'; import { formatCurrencyByScale, type NumberScaleUnit } from '@/lib/format'; import { queryKeys } from '@/lib/query/keys'; -import { filingsQueryOptions, taskQueryOptions } from '@/lib/query/options'; +import { filingsQueryOptions } from '@/lib/query/options'; const FINANCIAL_VALUE_SCALE_OPTIONS: Array<{ value: NumberScaleUnit; label: string }> = [ { value: 'thousands', label: 'Thousands (K)' }, @@ -115,7 +113,6 @@ function FilingsPageContent() { const [syncTickerInput, setSyncTickerInput] = useState(''); const [filterTickerInput, setFilterTickerInput] = useState(''); const [searchTicker, setSearchTicker] = useState(''); - const [activeTask, setActiveTask] = useState(null); const [financialValueScale, setFinancialValueScale] = useState('millions'); useEffect(() => { @@ -153,29 +150,16 @@ function FilingsPageContent() { } }, [isPending, isAuthenticated, searchTicker, loadFilings]); - const polledTask = useTaskPoller({ - taskId: activeTask?.id ?? null, - onTerminalState: async () => { - setActiveTask(null); - void queryClient.invalidateQueries({ queryKey: ['filings'] }); - void queryClient.invalidateQueries({ queryKey: queryKeys.recentTasks(20) }); - await loadFilings(searchTicker || undefined); - } - }); - - const liveTask = polledTask ?? activeTask; - const triggerSync = async () => { if (!syncTickerInput.trim()) { return; } try { - const { task } = await queueFilingSync({ ticker: syncTickerInput.trim().toUpperCase(), limit: 20 }); - const latest = await queryClient.fetchQuery(taskQueryOptions(task.id)); - setActiveTask(latest.task); + await queueFilingSync({ ticker: syncTickerInput.trim().toUpperCase(), limit: 20 }); void queryClient.invalidateQueries({ queryKey: queryKeys.recentTasks(20) }); void queryClient.invalidateQueries({ queryKey: ['filings'] }); + await loadFilings(searchTicker || undefined); } catch (err) { setError(err instanceof Error ? err.message : 'Failed to queue filing sync'); } @@ -183,9 +167,7 @@ function FilingsPageContent() { const triggerAnalysis = async (accessionNumber: string) => { try { - const { task } = await queueFilingAnalysis(accessionNumber); - const latest = await queryClient.fetchQuery(taskQueryOptions(task.id)); - setActiveTask(latest.task); + await queueFilingAnalysis(accessionNumber); void queryClient.invalidateQueries({ queryKey: queryKeys.recentTasks(20) }); void queryClient.invalidateQueries({ queryKey: ['report'] }); } catch (err) { @@ -230,16 +212,6 @@ function FilingsPageContent() { )} > - {liveTask ? ( - -
-

{liveTask.id}

- -
- {liveTask.error ?

{liveTask.error}

: null} -
- ) : null} -
(EMPTY_STATE); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); - const [activeTaskId, setActiveTaskId] = useState(null); const loadData = useCallback(async () => { const summaryOptions = portfolioSummaryQueryOptions(); @@ -102,30 +98,16 @@ export default function CommandCenterPage() { } }, [isPending, isAuthenticated, loadData]); - const trackedTask = useTaskPoller({ - taskId: activeTaskId, - onTerminalState: () => { - setActiveTaskId(null); - void queryClient.invalidateQueries({ queryKey: queryKeys.portfolioSummary() }); - void queryClient.invalidateQueries({ queryKey: queryKeys.latestPortfolioInsight() }); - void queryClient.invalidateQueries({ queryKey: queryKeys.recentTasks(20) }); - void queryClient.invalidateQueries({ queryKey: ['filings'] }); - void loadData(); - } - }); - const headerActions = ( <> +
+ + + {isLoading ? ( +
+ + Loading task timeline... +
+ ) : null} + + {error ? ( +

Unable to load task timeline.

+ ) : null} + + {task ? ( + <> +
+

Stage: {stageLabel(task.stage)}

+

Workflow run: {task.workflow_run_id ?? 'n/a'}

+

Created: {formatTimestamp(task.created_at)}

+

Finished: {formatTimestamp(task.finished_at)}

+

Updated: {formatTimestamp(task.updated_at)}

+

Attempts: {task.attempts}/{task.max_attempts}

+
+ +
+

Stage timeline

+
    + {timeline.map((item) => ( +
  1. +
    +

    {item.label}

    + + {item.state} + +
    + {item.detail ?

    {item.detail}

    : null} +

    {formatTimestamp(item.timestamp)}

    +
  2. + ))} +
+
+ + {task.error ? ( +
+

Error

+

{task.error}

+
+ ) : null} + + {task.result ? ( +
+

Result summary

+
{JSON.stringify(task.result, null, 2)}
+
+ ) : null} + + ) : null} + +
+ +
+ + + ); +} diff --git a/components/notifications/task-notifications-drawer.tsx b/components/notifications/task-notifications-drawer.tsx new file mode 100644 index 0000000..8d13570 --- /dev/null +++ b/components/notifications/task-notifications-drawer.tsx @@ -0,0 +1,181 @@ +'use client'; + +import { formatDistanceToNow } from 'date-fns'; +import { BellOff, CheckCheck, EyeOff, X } from 'lucide-react'; +import type { Task } from '@/lib/types'; +import { taskTypeLabel } from '@/components/notifications/task-stage-helpers'; +import { Button } from '@/components/ui/button'; +import { StatusPill } from '@/components/ui/status-pill'; + +type TaskNotificationsDrawerProps = { + isOpen: boolean; + onClose: () => void; + activeTasks: Task[]; + visibleFinishedTasks: Task[]; + awaitingReviewTasks: Task[]; + showReadFinished: boolean; + setShowReadFinished: (value: boolean) => void; + openTaskDetails: (taskId: string) => void; + markTaskRead: (taskId: string, read?: boolean) => Promise; + silenceTask: (taskId: string, silenced?: boolean) => Promise; +}; + +function TaskRow({ + task, + openTaskDetails, + markTaskRead, + silenceTask +}: { + task: Task; + openTaskDetails: (taskId: string) => void; + markTaskRead: (taskId: string, read?: boolean) => Promise; + silenceTask: (taskId: string, silenced?: boolean) => Promise; +}) { + const isTerminal = task.status === 'completed' || task.status === 'failed'; + const isRead = task.notification_read_at !== null; + + return ( +
+
+
+

{taskTypeLabel(task.task_type)}

+

{task.stage_detail ?? task.stage}

+

+ Updated {formatDistanceToNow(new Date(task.updated_at), { addSuffix: true })} +

+
+ +
+ +
+ + + {isTerminal ? ( + + ) : ( + + )} +
+
+ ); +} + +export function TaskNotificationsDrawer({ + isOpen, + onClose, + activeTasks, + visibleFinishedTasks, + awaitingReviewTasks, + showReadFinished, + setShowReadFinished, + openTaskDetails, + markTaskRead, + silenceTask +}: TaskNotificationsDrawerProps) { + if (!isOpen) { + return null; + } + + return ( +
+ + + +
+ +

Unread finished: {awaitingReviewTasks.length}

+
+ +
+

Active jobs

+
+ {activeTasks.length === 0 ? ( +

No active jobs.

+ ) : ( + activeTasks.map((task) => ( + + )) + )} +
+
+ +
+

Awaiting review

+
+ {visibleFinishedTasks.length === 0 ? ( +

No finished jobs to review.

+ ) : ( + visibleFinishedTasks.map((task) => ( + + )) + )} +
+
+ +
+ ); +} diff --git a/components/notifications/task-notifications-trigger.tsx b/components/notifications/task-notifications-trigger.tsx new file mode 100644 index 0000000..04affce --- /dev/null +++ b/components/notifications/task-notifications-trigger.tsx @@ -0,0 +1,180 @@ +'use client'; + +import { formatDistanceToNow } from 'date-fns'; +import { Bell, BellRing, ChevronRight } from 'lucide-react'; +import type { Task } from '@/lib/types'; +import { Button } from '@/components/ui/button'; +import { StatusPill } from '@/components/ui/status-pill'; +import { taskTypeLabel } from '@/components/notifications/task-stage-helpers'; +import { cn } from '@/lib/utils'; + +type TaskNotificationsTriggerProps = { + unreadCount: number; + isPopoverOpen: boolean; + setIsPopoverOpen: (value: boolean) => void; + activeTasks: Task[]; + awaitingReviewTasks: Task[]; + openDrawer: () => void; + openTaskDetails: (taskId: string) => void; + silenceTask: (taskId: string, silenced?: boolean) => Promise; + markTaskRead: (taskId: string, read?: boolean) => Promise; + className?: string; + mobile?: boolean; +}; + +export function TaskNotificationsTrigger({ + unreadCount, + isPopoverOpen, + setIsPopoverOpen, + activeTasks, + awaitingReviewTasks, + openDrawer, + openTaskDetails, + silenceTask, + markTaskRead, + className, + mobile = false +}: TaskNotificationsTriggerProps) { + const showPopover = !mobile; + + const button = ( + + ); + + if (!showPopover) { + return button; + } + + return ( +
+ {button} + {isPopoverOpen ? ( + <> + + +
+ + )) + )} + + +
+

Awaiting review

+ {awaitingReviewTasks.length === 0 ? ( +

No unread finished jobs.

+ ) : ( + awaitingReviewTasks.slice(0, 3).map((task) => ( +
+
+

{taskTypeLabel(task.task_type)}

+ +
+

+ {formatDistanceToNow(new Date(task.updated_at), { addSuffix: true })} +

+
+ + +
+
+ )) + )} +
+ + + + + ) : null} + + ); +} diff --git a/components/notifications/task-stage-helpers.ts b/components/notifications/task-stage-helpers.ts new file mode 100644 index 0000000..b026351 --- /dev/null +++ b/components/notifications/task-stage-helpers.ts @@ -0,0 +1,150 @@ +import type { Task, TaskStage, TaskStageEvent, TaskType } from '@/lib/types'; + +export type StageTimelineItem = { + stage: TaskStage; + label: string; + state: 'completed' | 'active' | 'pending'; + detail: string | null; + timestamp: string | null; +}; + +const TASK_TYPE_LABELS: Record = { + sync_filings: 'Filing sync', + refresh_prices: 'Price refresh', + analyze_filing: 'Filing analysis', + portfolio_insights: 'Portfolio insight' +}; + +const STAGE_LABELS: Record = { + queued: 'Queued', + running: 'Running', + completed: 'Completed', + failed: 'Failed', + 'sync.fetch_filings': 'Fetch filings', + 'sync.fetch_metrics': 'Fetch filing metrics', + 'sync.persist_filings': 'Persist filings', + 'sync.hydrate_statements': 'Hydrate statements', + 'refresh.load_holdings': 'Load holdings', + 'refresh.fetch_quotes': 'Fetch quotes', + 'refresh.persist_prices': 'Persist prices', + 'analyze.load_filing': 'Load filing', + 'analyze.fetch_document': 'Fetch primary document', + 'analyze.extract': 'Extract context', + 'analyze.generate_report': 'Generate report', + 'analyze.persist_report': 'Persist report', + 'insights.load_holdings': 'Load holdings', + 'insights.generate': 'Generate insight', + 'insights.persist': 'Persist insight' +}; + +const TASK_STAGE_ORDER: Record = { + sync_filings: [ + 'queued', + 'running', + 'sync.fetch_filings', + 'sync.fetch_metrics', + 'sync.persist_filings', + 'sync.hydrate_statements', + 'completed' + ], + refresh_prices: [ + 'queued', + 'running', + 'refresh.load_holdings', + 'refresh.fetch_quotes', + 'refresh.persist_prices', + 'completed' + ], + analyze_filing: [ + 'queued', + 'running', + 'analyze.load_filing', + 'analyze.fetch_document', + 'analyze.extract', + 'analyze.generate_report', + 'analyze.persist_report', + 'completed' + ], + portfolio_insights: [ + 'queued', + 'running', + 'insights.load_holdings', + 'insights.generate', + 'insights.persist', + 'completed' + ] +}; + +export function taskTypeLabel(taskType: TaskType) { + return TASK_TYPE_LABELS[taskType]; +} + +export function stageLabel(stage: TaskStage) { + return STAGE_LABELS[stage] ?? stage; +} + +export function buildStageTimeline(task: Task, events: TaskStageEvent[]): StageTimelineItem[] { + const baseOrder = TASK_STAGE_ORDER[task.task_type] ?? ['queued', 'running', 'completed']; + const orderedStages = [...baseOrder]; + + if (task.status === 'failed' && !orderedStages.includes('failed')) { + orderedStages.push('failed'); + } + + const latestEventByStage = new Map(); + for (const event of events) { + latestEventByStage.set(event.stage, event); + } + + return orderedStages.map((stage) => { + const event = latestEventByStage.get(stage); + + if (task.status === 'queued' || task.status === 'running') { + if (stage === task.stage) { + return { + stage, + label: stageLabel(stage), + state: 'active' as const, + detail: event?.stage_detail ?? task.stage_detail, + timestamp: event?.created_at ?? null + }; + } + + if (event) { + return { + stage, + label: stageLabel(stage), + state: 'completed' as const, + detail: event.stage_detail, + timestamp: event.created_at + }; + } + + return { + stage, + label: stageLabel(stage), + state: 'pending' as const, + detail: null, + timestamp: null + }; + } + + if (stage === task.stage || event) { + return { + stage, + label: stageLabel(stage), + state: 'completed' as const, + detail: event?.stage_detail ?? task.stage_detail, + timestamp: event?.created_at ?? task.finished_at + }; + } + + return { + stage, + label: stageLabel(stage), + state: 'pending' as const, + detail: null, + timestamp: null + }; + }); +} diff --git a/components/providers/query-provider.tsx b/components/providers/query-provider.tsx index 4a13d19..350afaf 100644 --- a/components/providers/query-provider.tsx +++ b/components/providers/query-provider.tsx @@ -2,6 +2,7 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; import { useState } from 'react'; +import { Toaster } from 'sonner'; type QueryProviderProps = { children: React.ReactNode; @@ -20,6 +21,15 @@ export function QueryProvider({ children }: QueryProviderProps) { return ( {children} + ); } diff --git a/components/shell/app-shell.tsx b/components/shell/app-shell.tsx index 5ffc902..786a953 100644 --- a/components/shell/app-shell.tsx +++ b/components/shell/app-shell.tsx @@ -7,6 +7,9 @@ import Link from 'next/link'; import { usePathname, useRouter, useSearchParams } from 'next/navigation'; import { useEffect, useMemo, useState } from 'react'; import { authClient } from '@/lib/auth-client'; +import { TaskDetailModal } from '@/components/notifications/task-detail-modal'; +import { TaskNotificationsDrawer } from '@/components/notifications/task-notifications-drawer'; +import { TaskNotificationsTrigger } from '@/components/notifications/task-notifications-trigger'; import { companyAnalysisQueryOptions, filingsQueryOptions, @@ -18,6 +21,7 @@ import { } from '@/lib/query/options'; import type { ActiveContext, NavGroup, NavItem } from '@/lib/types'; import { Button } from '@/components/ui/button'; +import { useTaskNotificationsCenter } from '@/hooks/use-task-notifications-center'; import { cn } from '@/lib/utils'; type AppShellProps = { @@ -186,6 +190,7 @@ export function AppShell({ title, subtitle, actions, activeTicker, breadcrumbs, const [isSigningOut, setIsSigningOut] = useState(false); const [isMoreOpen, setIsMoreOpen] = useState(false); + const notifications = useTaskNotificationsCenter(); const { data: session } = authClient.useSession(); const sessionUser = (session?.user ?? null) as { name?: string | null; email?: string | null; role?: unknown } | null; @@ -338,6 +343,11 @@ export function AppShell({ title, subtitle, actions, activeTicker, breadcrumbs, }; }, [navEntries]); + useEffect(() => { + notifications.setIsPopoverOpen(false); + setIsMoreOpen(false); + }, [pathname]); + const signOut = async () => { if (isSigningOut) { return; @@ -422,6 +432,17 @@ export function AppShell({ title, subtitle, actions, activeTicker, breadcrumbs, ) : null}
+ notifications.setIsDrawerOpen(true)} + openTaskDetails={notifications.openTaskDetails} + silenceTask={notifications.silenceTask} + markTaskRead={notifications.markTaskRead} + /> {actions}
) : null} + + notifications.setIsDrawerOpen(false)} + activeTasks={notifications.activeTasks} + visibleFinishedTasks={notifications.visibleFinishedTasks} + awaitingReviewTasks={notifications.awaitingReviewTasks} + showReadFinished={notifications.showReadFinished} + setShowReadFinished={notifications.setShowReadFinished} + openTaskDetails={notifications.openTaskDetails} + markTaskRead={notifications.markTaskRead} + silenceTask={notifications.silenceTask} + /> + + notifications.setIsDetailOpen(false)} + /> ); } diff --git a/docker-compose.yml b/docker-compose.yml index cf82dd6..fac1193 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,28 @@ services: + workflow-postgres: + image: postgres:16-alpine + restart: unless-stopped + environment: + POSTGRES_DB: ${WORKFLOW_POSTGRES_DB:-workflow} + POSTGRES_USER: ${WORKFLOW_POSTGRES_USER:-workflow} + POSTGRES_PASSWORD: ${WORKFLOW_POSTGRES_PASSWORD:-workflow} + volumes: + - workflow_postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${WORKFLOW_POSTGRES_USER:-workflow} -d ${WORKFLOW_POSTGRES_DB:-workflow} -h 127.0.0.1 || exit 1"] + interval: 10s + timeout: 5s + retries: 12 + app: build: context: . dockerfile: Dockerfile args: NEXT_PUBLIC_API_URL: ${NEXT_PUBLIC_API_URL:-} + depends_on: + workflow-postgres: + condition: service_healthy restart: unless-stopped env_file: - path: ./.env @@ -25,12 +43,14 @@ services: OLLAMA_MODEL: ${OLLAMA_MODEL:-qwen3:8b} OLLAMA_API_KEY: ${OLLAMA_API_KEY:-ollama} SEC_USER_AGENT: ${SEC_USER_AGENT:-Fiscal Clone } - WORKFLOW_TARGET_WORLD: local + WORKFLOW_TARGET_WORLD: ${WORKFLOW_TARGET_WORLD:-@workflow/world-postgres} + WORKFLOW_POSTGRES_URL: ${WORKFLOW_POSTGRES_URL:-postgres://workflow:workflow@workflow-postgres:5432/workflow} + WORKFLOW_POSTGRES_WORKER_CONCURRENCY: ${WORKFLOW_POSTGRES_WORKER_CONCURRENCY:-10} + WORKFLOW_POSTGRES_JOB_PREFIX: ${WORKFLOW_POSTGRES_JOB_PREFIX:-fiscal_} WORKFLOW_LOCAL_DATA_DIR: ${WORKFLOW_LOCAL_DATA_DIR:-/app/.workflow-data} WORKFLOW_LOCAL_QUEUE_CONCURRENCY: ${WORKFLOW_LOCAL_QUEUE_CONCURRENCY:-100} volumes: - fiscal_sqlite_data:/app/data - - fiscal_workflow_data:/app/.workflow-data healthcheck: test: ["CMD-SHELL", "wget -q --spider http://127.0.0.1:3000/api/health || exit 1"] interval: 30s @@ -41,4 +61,4 @@ services: volumes: fiscal_sqlite_data: - fiscal_workflow_data: + workflow_postgres_data: diff --git a/drizzle/0002_workflow_task_projection_metadata.sql b/drizzle/0002_workflow_task_projection_metadata.sql new file mode 100644 index 0000000..c121691 --- /dev/null +++ b/drizzle/0002_workflow_task_projection_metadata.sql @@ -0,0 +1,11 @@ +ALTER TABLE `task_run` ADD `stage` text NOT NULL DEFAULT 'queued'; +--> statement-breakpoint +ALTER TABLE `task_run` ADD `stage_detail` text; +--> statement-breakpoint +ALTER TABLE `task_run` ADD `resource_key` text; +--> statement-breakpoint +ALTER TABLE `task_run` ADD `notification_read_at` text; +--> statement-breakpoint +ALTER TABLE `task_run` ADD `notification_silenced_at` text; +--> statement-breakpoint +CREATE INDEX `task_user_resource_status_idx` ON `task_run` (`user_id`,`task_type`,`resource_key`,`status`,`created_at`); diff --git a/drizzle/0003_task_stage_event_timeline.sql b/drizzle/0003_task_stage_event_timeline.sql new file mode 100644 index 0000000..499cb8e --- /dev/null +++ b/drizzle/0003_task_stage_event_timeline.sql @@ -0,0 +1,15 @@ +CREATE TABLE `task_stage_event` ( + `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, + `task_id` text NOT NULL, + `user_id` text NOT NULL, + `stage` text NOT NULL, + `stage_detail` text, + `status` text NOT NULL, + `created_at` text NOT NULL, + FOREIGN KEY (`task_id`) REFERENCES `task_run`(`id`) ON UPDATE no action ON DELETE cascade, + FOREIGN KEY (`user_id`) REFERENCES `user`(`id`) ON UPDATE no action ON DELETE cascade +); +--> statement-breakpoint +CREATE INDEX `task_stage_event_task_created_idx` ON `task_stage_event` (`task_id`,`created_at`); +--> statement-breakpoint +CREATE INDEX `task_stage_event_user_created_idx` ON `task_stage_event` (`user_id`,`created_at`); diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index c35c118..9c20f18 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -15,6 +15,20 @@ "when": 1772417400000, "tag": "0001_glossy_statement_snapshots", "breakpoints": true + }, + { + "idx": 2, + "version": "6", + "when": 1772450400000, + "tag": "0002_workflow_task_projection_metadata", + "breakpoints": true + }, + { + "idx": 3, + "version": "6", + "when": 1772486100000, + "tag": "0003_task_stage_event_timeline", + "breakpoints": true } ] } diff --git a/hooks/use-api-queries.ts b/hooks/use-api-queries.ts index 3ad9687..a5b058f 100644 --- a/hooks/use-api-queries.ts +++ b/hooks/use-api-queries.ts @@ -10,6 +10,7 @@ import { portfolioSummaryQueryOptions, recentTasksQueryOptions, taskQueryOptions, + taskTimelineQueryOptions, watchlistQueryOptions } from '@/lib/query/options'; @@ -69,6 +70,13 @@ export function useTaskQuery(taskId: string, enabled = true) { }); } +export function useTaskTimelineQuery(taskId: string, enabled = true) { + return useQuery({ + ...taskTimelineQueryOptions(taskId), + enabled: enabled && taskId.length > 0 + }); +} + export function useRecentTasksQuery(limit = 20, enabled = true) { return useQuery({ ...recentTasksQueryOptions(limit), diff --git a/hooks/use-task-notifications-center.ts b/hooks/use-task-notifications-center.ts new file mode 100644 index 0000000..658a7d1 --- /dev/null +++ b/hooks/use-task-notifications-center.ts @@ -0,0 +1,405 @@ +'use client'; + +import { useQueryClient } from '@tanstack/react-query'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; +import { toast } from 'sonner'; +import { + listRecentTasks, + updateTaskNotificationState +} from '@/lib/api'; +import type { Task, TaskStatus } from '@/lib/types'; + +const ACTIVE_STATUSES: TaskStatus[] = ['queued', 'running']; +const TERMINAL_STATUSES: TaskStatus[] = ['completed', 'failed']; + +function isTerminalTask(task: Task) { + return TERMINAL_STATUSES.includes(task.status); +} + +function taskSignature(task: Task) { + return `${task.status}|${task.stage}|${task.stage_detail ?? ''}|${task.error ?? ''}`; +} + +function taskTitle(task: Task) { + switch (task.task_type) { + case 'sync_filings': + return 'Filing sync'; + case 'refresh_prices': + return 'Price refresh'; + case 'analyze_filing': + return 'Filing analysis'; + case 'portfolio_insights': + return 'Portfolio insight'; + default: + return 'Task'; + } +} + +function taskDescription(task: Task) { + if (task.error && task.status === 'failed') { + return task.error; + } + + if (task.stage_detail) { + return task.stage_detail; + } + + switch (task.status) { + case 'queued': + return 'Queued and waiting for execution.'; + case 'running': + return 'Running in workflow engine.'; + case 'completed': + return 'Task finished successfully.'; + case 'failed': + return 'Task failed.'; + default: + return 'Task status changed.'; + } +} + +function shouldNotifyTask(task: Task) { + return !task.notification_silenced_at; +} + +function isUnread(task: Task) { + return task.notification_read_at === null; +} + +type UseTaskNotificationsCenterResult = { + activeTasks: Task[]; + finishedTasks: Task[]; + unreadCount: number; + awaitingReviewTasks: Task[]; + visibleFinishedTasks: Task[]; + showReadFinished: boolean; + setShowReadFinished: (value: boolean) => void; + isPopoverOpen: boolean; + setIsPopoverOpen: (value: boolean) => void; + isDrawerOpen: boolean; + setIsDrawerOpen: (value: boolean) => void; + detailTaskId: string | null; + setDetailTaskId: (value: string | null) => void; + isDetailOpen: boolean; + setIsDetailOpen: (value: boolean) => void; + openTaskDetails: (taskId: string) => void; + markTaskRead: (taskId: string, read?: boolean) => Promise; + silenceTask: (taskId: string, silenced?: boolean) => Promise; + refreshTasks: () => Promise; +}; + +export function useTaskNotificationsCenter(): UseTaskNotificationsCenterResult { + const queryClient = useQueryClient(); + const [activeTasks, setActiveTasks] = useState([]); + const [finishedTasks, setFinishedTasks] = useState([]); + const [showReadFinished, setShowReadFinished] = useState(false); + const [isPopoverOpen, setIsPopoverOpen] = useState(false); + const [isDrawerOpen, setIsDrawerOpen] = useState(false); + const [detailTaskId, setDetailTaskId] = useState(null); + const [isDetailOpen, setIsDetailOpen] = useState(false); + + const activeLoadedRef = useRef(false); + const finishedLoadedRef = useRef(false); + const stateSignaturesRef = useRef(new Map()); + const invalidatedTerminalRef = useRef(new Set()); + const activeSnapshotRef = useRef([]); + const finishedSnapshotRef = useRef([]); + + const applyTaskLocally = useCallback((task: Task) => { + setActiveTasks((prev) => prev.map((entry) => (entry.id === task.id ? task : entry))); + setFinishedTasks((prev) => prev.map((entry) => (entry.id === task.id ? task : entry))); + }, []); + + const invalidateForTerminalTask = useCallback((task: Task) => { + const key = `${task.id}:${task.status}`; + if (invalidatedTerminalRef.current.has(key)) { + return; + } + + invalidatedTerminalRef.current.add(key); + void queryClient.invalidateQueries({ queryKey: ['tasks'] }); + + switch (task.task_type) { + case 'sync_filings': { + void queryClient.invalidateQueries({ queryKey: ['filings'] }); + void queryClient.invalidateQueries({ queryKey: ['analysis'] }); + void queryClient.invalidateQueries({ queryKey: ['financials-v2'] }); + break; + } + case 'analyze_filing': { + void queryClient.invalidateQueries({ queryKey: ['filings'] }); + void queryClient.invalidateQueries({ queryKey: ['report'] }); + void queryClient.invalidateQueries({ queryKey: ['analysis'] }); + break; + } + case 'refresh_prices': { + void queryClient.invalidateQueries({ queryKey: ['portfolio', 'holdings'] }); + void queryClient.invalidateQueries({ queryKey: ['portfolio', 'summary'] }); + break; + } + case 'portfolio_insights': { + void queryClient.invalidateQueries({ queryKey: ['portfolio', 'insights', 'latest'] }); + break; + } + default: + break; + } + }, [queryClient]); + + const openTaskDetails = useCallback((taskId: string) => { + setDetailTaskId(taskId); + setIsDetailOpen(true); + setIsDrawerOpen(true); + setIsPopoverOpen(false); + }, []); + + const silenceTask = useCallback(async (taskId: string, silenced = true) => { + try { + const { task } = await updateTaskNotificationState(taskId, { silenced }); + applyTaskLocally(task); + toast.dismiss(taskId); + } catch { + toast.error('Unable to update notification state'); + } + }, [applyTaskLocally]); + + const markTaskRead = useCallback(async (taskId: string, read = true) => { + try { + const { task } = await updateTaskNotificationState(taskId, { read }); + applyTaskLocally(task); + if (read) { + toast.dismiss(taskId); + } + } catch { + toast.error('Unable to update notification state'); + } + }, [applyTaskLocally]); + + const emitTaskToast = useCallback((task: Task) => { + if (!shouldNotifyTask(task)) { + toast.dismiss(task.id); + return; + } + + if (task.status === 'queued' || task.status === 'running') { + toast(taskTitle(task), { + id: task.id, + duration: Number.POSITIVE_INFINITY, + description: taskDescription(task), + action: { + label: 'Open details', + onClick: () => openTaskDetails(task.id) + }, + cancel: { + label: 'Silence', + onClick: () => { + void silenceTask(task.id, true); + } + } + }); + return; + } + + const toastBuilder = task.status === 'completed' ? toast.success : toast.error; + + toastBuilder(taskTitle(task), { + id: task.id, + duration: 10_000, + description: taskDescription(task), + action: { + label: 'Open details', + onClick: () => openTaskDetails(task.id) + }, + cancel: { + label: 'Mark read', + onClick: () => { + void markTaskRead(task.id, true); + } + } + }); + }, [markTaskRead, openTaskDetails, silenceTask]); + + const processSnapshots = useCallback(() => { + const active = activeSnapshotRef.current; + const finished = finishedSnapshotRef.current; + const all = [...active, ...finished]; + + if (!activeLoadedRef.current || !finishedLoadedRef.current) { + return; + } + + if (stateSignaturesRef.current.size === 0) { + for (const task of all) { + stateSignaturesRef.current.set(task.id, taskSignature(task)); + } + return; + } + + for (const task of all) { + const signature = taskSignature(task); + const previousSignature = stateSignaturesRef.current.get(task.id); + const wasKnown = previousSignature !== undefined; + + if (!wasKnown || previousSignature !== signature) { + emitTaskToast(task); + + if (isTerminalTask(task)) { + invalidateForTerminalTask(task); + } + } + + stateSignaturesRef.current.set(task.id, signature); + } + + const currentIds = new Set(all.map((task) => task.id)); + for (const knownId of [...stateSignaturesRef.current.keys()]) { + if (!currentIds.has(knownId)) { + toast.dismiss(knownId); + } + } + }, [emitTaskToast, invalidateForTerminalTask]); + + const refreshTasks = useCallback(async () => { + try { + const [activeRes, finishedRes] = await Promise.all([ + listRecentTasks({ + limit: 80, + statuses: ACTIVE_STATUSES + }), + listRecentTasks({ + limit: 120, + statuses: TERMINAL_STATUSES + }) + ]); + + activeSnapshotRef.current = activeRes.tasks; + finishedSnapshotRef.current = finishedRes.tasks; + activeLoadedRef.current = true; + finishedLoadedRef.current = true; + setActiveTasks(activeRes.tasks); + setFinishedTasks(finishedRes.tasks); + processSnapshots(); + } catch { + // ignore transient polling failures + } + }, [processSnapshots]); + + useEffect(() => { + let cancelled = false; + let activeTimer: ReturnType | null = null; + let terminalTimer: ReturnType | null = null; + + const runActiveLoop = async () => { + if (cancelled) { + return; + } + + try { + const response = await listRecentTasks({ + limit: 80, + statuses: ACTIVE_STATUSES + }); + + if (cancelled) { + return; + } + + activeSnapshotRef.current = response.tasks; + activeLoadedRef.current = true; + setActiveTasks(response.tasks); + processSnapshots(); + } catch { + // ignore transient polling failures + } + + activeTimer = setTimeout(runActiveLoop, 2_000); + }; + + const runTerminalLoop = async () => { + if (cancelled) { + return; + } + + try { + const response = await listRecentTasks({ + limit: 120, + statuses: TERMINAL_STATUSES + }); + + if (cancelled) { + return; + } + + finishedSnapshotRef.current = response.tasks; + finishedLoadedRef.current = true; + setFinishedTasks(response.tasks); + processSnapshots(); + } catch { + // ignore transient polling failures + } + + terminalTimer = setTimeout(runTerminalLoop, 4_000); + }; + + void runActiveLoop(); + void runTerminalLoop(); + + return () => { + cancelled = true; + if (activeTimer) { + clearTimeout(activeTimer); + } + if (terminalTimer) { + clearTimeout(terminalTimer); + } + }; + }, [processSnapshots]); + + const normalizedActiveTasks = useMemo(() => { + return activeTasks.filter((task) => ACTIVE_STATUSES.includes(task.status)); + }, [activeTasks]); + + const normalizedFinishedTasks = useMemo(() => { + return finishedTasks.filter((task) => TERMINAL_STATUSES.includes(task.status)); + }, [finishedTasks]); + + const awaitingReviewTasks = useMemo(() => { + return normalizedFinishedTasks.filter((task) => isUnread(task)); + }, [normalizedFinishedTasks]); + + const visibleFinishedTasks = useMemo(() => { + if (showReadFinished) { + return normalizedFinishedTasks; + } + + return awaitingReviewTasks; + }, [awaitingReviewTasks, normalizedFinishedTasks, showReadFinished]); + + const unreadCount = useMemo(() => { + const unreadTerminal = normalizedFinishedTasks.filter((task) => isUnread(task)).length; + const unreadActive = normalizedActiveTasks.filter((task) => isUnread(task) && !task.notification_silenced_at).length; + return unreadTerminal + unreadActive; + }, [normalizedActiveTasks, normalizedFinishedTasks]); + + return { + activeTasks: normalizedActiveTasks, + finishedTasks: normalizedFinishedTasks, + unreadCount, + awaitingReviewTasks, + visibleFinishedTasks, + showReadFinished, + setShowReadFinished, + isPopoverOpen, + setIsPopoverOpen, + isDrawerOpen, + setIsDrawerOpen, + detailTaskId, + setDetailTaskId, + isDetailOpen, + setIsDetailOpen, + openTaskDetails, + markTaskRead, + silenceTask, + refreshTasks + }; +} diff --git a/lib/api.ts b/lib/api.ts index 2e06939..dfec448 100644 --- a/lib/api.ts +++ b/lib/api.ts @@ -12,6 +12,8 @@ import type { PortfolioInsight, PortfolioSummary, Task, + TaskStatus, + TaskTimeline, User, WatchlistItem } from './types'; @@ -244,10 +246,29 @@ export async function getTask(taskId: string) { return await unwrapData<{ task: Task }>(result, 'Unable to fetch task'); } -export async function listRecentTasks(limit = 20) { +export async function getTaskTimeline(taskId: string) { + const result = await client.api.tasks[taskId].timeline.get(); + return await unwrapData(result, 'Unable to fetch task timeline'); +} + +export async function updateTaskNotificationState( + taskId: string, + input: { read?: boolean; silenced?: boolean } +) { + const result = await client.api.tasks[taskId].notification.patch(input); + return await unwrapData<{ task: Task }>(result, 'Unable to update task notification state'); +} + +export async function listRecentTasks(input: { + limit?: number; + statuses?: TaskStatus[]; +} = {}) { const result = await client.api.tasks.get({ $query: { - limit + limit: input.limit ?? 20, + ...(input.statuses && input.statuses.length > 0 + ? { status: input.statuses } + : {}) } }); diff --git a/lib/query/keys.ts b/lib/query/keys.ts index 55a31c3..6446e93 100644 --- a/lib/query/keys.ts +++ b/lib/query/keys.ts @@ -16,5 +16,6 @@ export const queryKeys = { portfolioSummary: () => ['portfolio', 'summary'] as const, latestPortfolioInsight: () => ['portfolio', 'insights', 'latest'] as const, task: (taskId: string) => ['tasks', 'detail', taskId] as const, + taskTimeline: (taskId: string) => ['tasks', 'timeline', taskId] as const, recentTasks: (limit: number) => ['tasks', 'recent', limit] as const }; diff --git a/lib/query/options.ts b/lib/query/options.ts index 8282cee..1278c75 100644 --- a/lib/query/options.ts +++ b/lib/query/options.ts @@ -6,6 +6,7 @@ import { getLatestPortfolioInsight, getPortfolioSummary, getTask, + getTaskTimeline, listFilings, listHoldings, listRecentTasks, @@ -126,10 +127,18 @@ export function taskQueryOptions(taskId: string) { }); } -export function recentTasksQueryOptions(limit = 20) { +export function taskTimelineQueryOptions(taskId: string) { return queryOptions({ - queryKey: queryKeys.recentTasks(limit), - queryFn: () => listRecentTasks(limit), + queryKey: queryKeys.taskTimeline(taskId), + queryFn: () => getTaskTimeline(taskId), + staleTime: 5_000 + }); +} + +export function recentTasksQueryOptions(limit = 20) { + return queryOptions({ + queryKey: queryKeys.recentTasks(limit), + queryFn: () => listRecentTasks({ limit }), staleTime: 5_000 }); } diff --git a/lib/server/api/app.ts b/lib/server/api/app.ts index 434d9d8..900aa89 100644 --- a/lib/server/api/app.ts +++ b/lib/server/api/app.ts @@ -1,4 +1,5 @@ import { Elysia, t } from 'elysia'; +import { getWorld } from 'workflow/runtime'; import type { Filing, FinancialHistoryWindow, @@ -31,9 +32,12 @@ import { import { getPriceHistory, getQuote } from '@/lib/server/prices'; import { enqueueTask, + findInFlightTask, getTaskById, + getTaskTimeline, getTaskQueueSnapshot, - listRecentTasks + listRecentTasks, + updateTaskNotification } from '@/lib/server/tasks'; const ALLOWED_STATUSES: TaskStatus[] = ['queued', 'running', 'completed', 'failed']; @@ -120,7 +124,8 @@ async function queueAutoFilingSync(userId: string, ticker: string) { ticker, limit: AUTO_FILING_SYNC_LIMIT }, - priority: 90 + priority: 90, + resourceKey: `sync_filings:${ticker}` }); return true; @@ -132,18 +137,63 @@ async function queueAutoFilingSync(userId: string, ticker: string) { const authHandler = ({ request }: { request: Request }) => auth.handler(request); +async function checkWorkflowBackend() { + try { + const world = getWorld(); + await world.runs.list({ + pagination: { limit: 1 }, + resolveData: 'none' + }); + + return { ok: true } as const; + } catch (error) { + return { + ok: false, + reason: asErrorMessage(error, 'Workflow backend unavailable') + } as const; + } +} + export const app = new Elysia({ prefix: '/api' }) .all('/auth', authHandler) .all('/auth/*', authHandler) .get('/health', async () => { - const queue = await getTaskQueueSnapshot(); + try { + const [queue, workflowBackend] = await Promise.all([ + getTaskQueueSnapshot(), + checkWorkflowBackend() + ]); - return Response.json({ - status: 'ok', - version: '4.0.0', - timestamp: new Date().toISOString(), - queue - }); + if (!workflowBackend.ok) { + return Response.json({ + status: 'degraded', + version: '4.0.0', + timestamp: new Date().toISOString(), + queue, + workflow: { + ok: false, + reason: workflowBackend.reason + } + }, { status: 503 }); + } + + return Response.json({ + status: 'ok', + version: '4.0.0', + timestamp: new Date().toISOString(), + queue, + workflow: { + ok: true + } + }); + } catch (error) { + return Response.json({ + status: 'degraded', + version: '4.0.0', + timestamp: new Date().toISOString(), + error: asErrorMessage(error, 'Health check failed') + }, { status: 503 }); + } }) .get('/me', async () => { const { session, response } = await requireAuthenticatedSession(); @@ -375,7 +425,8 @@ export const app = new Elysia({ prefix: '/api' }) userId: session.user.id, taskType: 'refresh_prices', payload: {}, - priority: 80 + priority: 80, + resourceKey: 'refresh_prices:portfolio' }); return Response.json({ task }); @@ -394,7 +445,8 @@ export const app = new Elysia({ prefix: '/api' }) userId: session.user.id, taskType: 'portfolio_insights', payload: {}, - priority: 70 + priority: 70, + resourceKey: 'portfolio_insights:portfolio' }); return Response.json({ task }); @@ -543,7 +595,8 @@ export const app = new Elysia({ prefix: '/api' }) ticker, limit: defaultFinancialSyncLimit(window) }, - priority: 88 + priority: 88, + resourceKey: `sync_filings:${ticker}` }); queuedSync = true; } catch (error) { @@ -668,7 +721,8 @@ export const app = new Elysia({ prefix: '/api' }) ticker, limit: Number.isFinite(limit) ? limit : 20 }, - priority: 90 + priority: 90, + resourceKey: `sync_filings:${ticker}` }); return Response.json({ task }); @@ -693,11 +747,23 @@ export const app = new Elysia({ prefix: '/api' }) } try { + const resourceKey = `analyze_filing:${accessionNumber}`; + const existing = await findInFlightTask( + session.user.id, + 'analyze_filing', + resourceKey + ); + + if (existing) { + return Response.json({ task: existing }); + } + const task = await enqueueTask({ userId: session.user.id, taskType: 'analyze_filing', payload: { accessionNumber }, - priority: 65 + priority: 65, + resourceKey }); return Response.json({ task }); @@ -760,6 +826,56 @@ export const app = new Elysia({ prefix: '/api' }) params: t.Object({ taskId: t.String({ minLength: 1 }) }) + }) + .get('/tasks/:taskId/timeline', async ({ params }) => { + const { session, response } = await requireAuthenticatedSession(); + if (response) { + return response; + } + + const timeline = await getTaskTimeline(params.taskId, session.user.id); + if (!timeline) { + return jsonError('Task not found', 404); + } + + return Response.json(timeline); + }, { + params: t.Object({ + taskId: t.String({ minLength: 1 }) + }) + }) + .patch('/tasks/:taskId/notification', async ({ params, body }) => { + const { session, response } = await requireAuthenticatedSession(); + if (response) { + return response; + } + + const payload = asRecord(body); + const read = typeof payload.read === 'boolean' ? payload.read : undefined; + const silenced = typeof payload.silenced === 'boolean' ? payload.silenced : undefined; + + if (read === undefined && silenced === undefined) { + return jsonError('read or silenced must be provided'); + } + + const task = await updateTaskNotification(session.user.id, params.taskId, { + read, + silenced + }); + + if (!task) { + return jsonError('Task not found', 404); + } + + return Response.json({ task }); + }, { + params: t.Object({ + taskId: t.String({ minLength: 1 }) + }), + body: t.Object({ + read: t.Optional(t.Boolean()), + silenced: t.Optional(t.Boolean()) + }) }); export type App = typeof app; diff --git a/lib/server/api/task-workflow-hybrid.e2e.test.ts b/lib/server/api/task-workflow-hybrid.e2e.test.ts new file mode 100644 index 0000000..bc8501c --- /dev/null +++ b/lib/server/api/task-workflow-hybrid.e2e.test.ts @@ -0,0 +1,319 @@ +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + it, + mock +} from 'bun:test'; +import { mkdtempSync, readFileSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import type { WorkflowRunStatus } from '@workflow/world'; + +const TEST_USER_ID = 'e2e-user'; +const TEST_USER_EMAIL = 'e2e@example.com'; +const TEST_USER_NAME = 'E2E User'; + +const runStatuses = new Map(); +let runCounter = 0; +let workflowBackendHealthy = true; + +let tempDir: string | null = null; +let sqliteClient: { exec: (query: string) => void; close: () => void } | null = null; +let app: { handle: (request: Request) => Promise } | null = null; + +mock.module('workflow/api', () => ({ + start: mock(async () => { + runCounter += 1; + const runId = `run-${runCounter}`; + runStatuses.set(runId, 'pending'); + + return { runId }; + }), + getRun: mock((runId: string) => ({ + get status() { + return Promise.resolve(runStatuses.get(runId) ?? 'pending'); + } + })) +})); + +mock.module('workflow/runtime', () => ({ + getWorld: () => ({ + runs: { + list: async () => { + if (!workflowBackendHealthy) { + throw new Error('Workflow backend unavailable'); + } + + return { + data: [] + }; + } + } + }) +})); + +mock.module('@/lib/server/auth-session', () => ({ + requireAuthenticatedSession: async () => ({ + session: { + user: { + id: TEST_USER_ID, + email: TEST_USER_EMAIL, + name: TEST_USER_NAME, + image: null + } + }, + response: null + }) +})); + +function resetDbSingletons() { + const globalState = globalThis as typeof globalThis & { + __fiscalSqliteClient?: { close?: () => void }; + __fiscalDrizzleDb?: unknown; + }; + + globalState.__fiscalSqliteClient?.close?.(); + globalState.__fiscalSqliteClient = undefined; + globalState.__fiscalDrizzleDb = undefined; +} + +function applySqlMigrations(client: { exec: (query: string) => void }) { + const migrationFiles = [ + '0000_cold_silver_centurion.sql', + '0001_glossy_statement_snapshots.sql', + '0002_workflow_task_projection_metadata.sql', + '0003_task_stage_event_timeline.sql' + ]; + + for (const file of migrationFiles) { + const sql = readFileSync(join(process.cwd(), 'drizzle', file), 'utf8'); + client.exec(sql); + } +} + +function ensureTestUser(client: { exec: (query: string) => void }) { + const now = Date.now(); + + client.exec(` + INSERT OR REPLACE INTO user ( + id, name, email, emailVerified, image, createdAt, updatedAt, role, banned, banReason, banExpires + ) VALUES ( + '${TEST_USER_ID}', + '${TEST_USER_NAME}', + '${TEST_USER_EMAIL}', + 1, + NULL, + ${now}, + ${now}, + NULL, + 0, + NULL, + NULL + ); + `); +} + +function clearProjectionTables(client: { exec: (query: string) => void }) { + client.exec('DELETE FROM task_stage_event;'); + client.exec('DELETE FROM task_run;'); +} + +async function jsonRequest( + method: 'GET' | 'POST' | 'PATCH', + path: string, + body?: Record +) { + if (!app) { + throw new Error('app not initialized'); + } + + const response = await app.handle(new Request(`http://localhost${path}`, { + method, + headers: body ? { 'content-type': 'application/json' } : undefined, + body: body ? JSON.stringify(body) : undefined + })); + + return { + response, + json: await response.json() + }; +} + +if (process.env.RUN_TASK_WORKFLOW_E2E === '1') { + describe('task workflow hybrid migration e2e', () => { + beforeAll(async () => { + tempDir = mkdtempSync(join(tmpdir(), 'fiscal-task-e2e-')); + const env = process.env as Record; + env.DATABASE_URL = `file:${join(tempDir, 'e2e.sqlite')}`; + env.NODE_ENV = 'test'; + + resetDbSingletons(); + + const dbModule = await import('@/lib/server/db'); + sqliteClient = dbModule.getSqliteClient(); + applySqlMigrations(sqliteClient); + ensureTestUser(sqliteClient); + + const appModule = await import('./app'); + app = appModule.app; + }); + + afterAll(() => { + resetDbSingletons(); + + if (tempDir) { + rmSync(tempDir, { recursive: true, force: true }); + } + }); + + beforeEach(() => { + if (!sqliteClient) { + throw new Error('sqlite client not initialized'); + } + + clearProjectionTables(sqliteClient); + runStatuses.clear(); + runCounter = 0; + workflowBackendHealthy = true; + }); + + it('queues multiple analyze jobs and suppresses duplicate in-flight analyze jobs', async () => { + const first = await jsonRequest('POST', '/api/filings/0000000000-26-000001/analyze'); + expect(first.response.status).toBe(200); + const firstTaskId = (first.json as { task: { id: string } }).task.id; + + const [second, third] = await Promise.all([ + jsonRequest('POST', '/api/filings/0000000000-26-000002/analyze'), + jsonRequest('POST', '/api/filings/0000000000-26-000003/analyze') + ]); + + expect(second.response.status).toBe(200); + expect(third.response.status).toBe(200); + + const duplicate = await jsonRequest('POST', '/api/filings/0000000000-26-000001/analyze'); + expect(duplicate.response.status).toBe(200); + expect((duplicate.json as { task: { id: string } }).task.id).toBe(firstTaskId); + + const tasksResponse = await jsonRequest('GET', '/api/tasks?limit=10'); + expect(tasksResponse.response.status).toBe(200); + + const tasks = (tasksResponse.json as { + tasks: Array<{ + id: string; + status: string; + stage: string; + workflow_run_id?: string | null; + }>; + }).tasks; + + expect(tasks.length).toBe(3); + expect(tasks.every((task) => task.status === 'queued')).toBe(true); + expect(tasks.every((task) => task.stage === 'queued')).toBe(true); + expect(tasks.every((task) => typeof task.workflow_run_id === 'string' && task.workflow_run_id.length > 0)).toBe(true); + }); + + it('updates notification read and silenced state via patch endpoint', async () => { + const created = await jsonRequest('POST', '/api/filings/0000000000-26-000010/analyze'); + const taskId = (created.json as { task: { id: string } }).task.id; + + const readUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, { read: true }); + expect(readUpdate.response.status).toBe(200); + const readTask = (readUpdate.json as { + task: { + notification_read_at: string | null; + notification_silenced_at: string | null; + }; + }).task; + expect(readTask.notification_read_at).toBeTruthy(); + expect(readTask.notification_silenced_at).toBeNull(); + + const silencedUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, { + silenced: true + }); + expect(silencedUpdate.response.status).toBe(200); + const silencedTask = (silencedUpdate.json as { + task: { + notification_read_at: string | null; + notification_silenced_at: string | null; + }; + }).task; + expect(silencedTask.notification_read_at).toBeTruthy(); + expect(silencedTask.notification_silenced_at).toBeTruthy(); + + const resetUpdate = await jsonRequest('PATCH', `/api/tasks/${taskId}/notification`, { + read: false, + silenced: false + }); + expect(resetUpdate.response.status).toBe(200); + const resetTask = (resetUpdate.json as { + task: { + notification_read_at: string | null; + notification_silenced_at: string | null; + }; + }).task; + expect(resetTask.notification_read_at).toBeNull(); + expect(resetTask.notification_silenced_at).toBeNull(); + }); + + it('reconciles workflow run status into projection state and degrades health when workflow backend is down', async () => { + const created = await jsonRequest('POST', '/api/filings/0000000000-26-000100/analyze'); + const task = (created.json as { + task: { id: string; workflow_run_id: string }; + }).task; + + runStatuses.set(task.workflow_run_id, 'running'); + const running = await jsonRequest('GET', `/api/tasks/${task.id}`); + expect(running.response.status).toBe(200); + const runningTask = (running.json as { task: { status: string; stage: string } }).task; + expect(runningTask.status).toBe('running'); + expect(runningTask.stage).toBe('running'); + + runStatuses.set(task.workflow_run_id, 'completed'); + const completed = await jsonRequest('GET', `/api/tasks/${task.id}`); + expect(completed.response.status).toBe(200); + const completedTask = (completed.json as { + task: { + status: string; + stage: string; + finished_at: string | null; + }; + }).task; + expect(completedTask.status).toBe('completed'); + expect(completedTask.stage).toBe('completed'); + expect(completedTask.finished_at).toBeTruthy(); + + const timeline = await jsonRequest('GET', `/api/tasks/${task.id}/timeline`); + expect(timeline.response.status).toBe(200); + const events = (timeline.json as { + events: Array<{ + stage: string; + status: string; + }>; + }).events; + expect(events.length).toBeGreaterThanOrEqual(3); + expect(events.some((event) => event.status === 'queued')).toBe(true); + expect(events.some((event) => event.status === 'running')).toBe(true); + expect(events.some((event) => event.status === 'completed')).toBe(true); + + const healthy = await jsonRequest('GET', '/api/health'); + expect(healthy.response.status).toBe(200); + expect((healthy.json as { status: string; workflow: { ok: boolean } }).status).toBe('ok'); + expect((healthy.json as { status: string; workflow: { ok: boolean } }).workflow.ok).toBe(true); + + workflowBackendHealthy = false; + const degraded = await jsonRequest('GET', '/api/health'); + expect(degraded.response.status).toBe(503); + expect((degraded.json as { + status: string; + workflow: { ok: boolean; reason: string }; + }).status).toBe('degraded'); + expect((degraded.json as { + status: string; + workflow: { ok: boolean; reason: string }; + }).workflow.ok).toBe(false); + }); + }); +} diff --git a/lib/server/db/schema.ts b/lib/server/db/schema.ts index 7e89a9c..eb06db6 100644 --- a/lib/server/db/schema.ts +++ b/lib/server/db/schema.ts @@ -288,6 +288,11 @@ export const taskRun = sqliteTable('task_run', { user_id: text('user_id').notNull().references(() => user.id, { onDelete: 'cascade' }), task_type: text('task_type').$type<'sync_filings' | 'refresh_prices' | 'analyze_filing' | 'portfolio_insights'>().notNull(), status: text('status').$type<'queued' | 'running' | 'completed' | 'failed'>().notNull(), + stage: text('stage').notNull(), + stage_detail: text('stage_detail'), + resource_key: text('resource_key'), + notification_read_at: text('notification_read_at'), + notification_silenced_at: text('notification_silenced_at'), priority: integer('priority').notNull(), payload: text('payload', { mode: 'json' }).$type>().notNull(), result: text('result', { mode: 'json' }).$type | null>(), @@ -301,9 +306,29 @@ export const taskRun = sqliteTable('task_run', { }, (table) => ({ taskUserCreatedIndex: index('task_user_created_idx').on(table.user_id, table.created_at), taskStatusIndex: index('task_status_idx').on(table.status), + taskUserResourceStatusIndex: index('task_user_resource_status_idx').on( + table.user_id, + table.task_type, + table.resource_key, + table.status, + table.created_at + ), taskWorkflowRunUnique: uniqueIndex('task_workflow_run_uidx').on(table.workflow_run_id) })); +export const taskStageEvent = sqliteTable('task_stage_event', { + id: integer('id').primaryKey({ autoIncrement: true }), + task_id: text('task_id').notNull().references(() => taskRun.id, { onDelete: 'cascade' }), + user_id: text('user_id').notNull().references(() => user.id, { onDelete: 'cascade' }), + stage: text('stage').notNull(), + stage_detail: text('stage_detail'), + status: text('status').$type<'queued' | 'running' | 'completed' | 'failed'>().notNull(), + created_at: text('created_at').notNull() +}, (table) => ({ + taskStageEventTaskCreatedIndex: index('task_stage_event_task_created_idx').on(table.task_id, table.created_at), + taskStageEventUserCreatedIndex: index('task_stage_event_user_created_idx').on(table.user_id, table.created_at) +})); + export const portfolioInsight = sqliteTable('portfolio_insight', { id: integer('id').primaryKey({ autoIncrement: true }), user_id: text('user_id').notNull().references(() => user.id, { onDelete: 'cascade' }), @@ -332,6 +357,7 @@ export const appSchema = { filingStatementSnapshot, filingLink, taskRun, + taskStageEvent, portfolioInsight }; diff --git a/lib/server/repos/tasks.ts b/lib/server/repos/tasks.ts index ecee1c3..1cfbef8 100644 --- a/lib/server/repos/tasks.ts +++ b/lib/server/repos/tasks.ts @@ -1,9 +1,10 @@ -import { and, desc, eq, inArray, sql } from 'drizzle-orm'; -import type { Task, TaskStatus, TaskType } from '@/lib/types'; +import { and, asc, desc, eq, inArray, sql } from 'drizzle-orm'; +import type { Task, TaskStage, TaskStageEvent, TaskStatus, TaskType } from '@/lib/types'; import { db } from '@/lib/server/db'; -import { taskRun } from '@/lib/server/db/schema'; +import { taskRun, taskStageEvent } from '@/lib/server/db/schema'; type TaskRow = typeof taskRun.$inferSelect; +type TaskStageEventRow = typeof taskStageEvent.$inferSelect; type CreateTaskInput = { id: string; @@ -12,14 +13,36 @@ type CreateTaskInput = { payload: Record; priority: number; max_attempts: number; + resource_key?: string | null; }; +type UpdateTaskNotificationStateInput = { + read?: boolean; + silenced?: boolean; +}; + +type EventInsertInput = { + task_id: string; + user_id: string; + stage: TaskStage; + stage_detail: string | null; + status: TaskStatus; + created_at: string; +}; + +type InsertExecutor = Pick; + function toTask(row: TaskRow): Task { return { id: row.id, user_id: row.user_id, task_type: row.task_type, status: row.status, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + resource_key: row.resource_key, + notification_read_at: row.notification_read_at, + notification_silenced_at: row.notification_silenced_at, priority: row.priority, payload: row.payload, result: row.result, @@ -33,30 +56,84 @@ function toTask(row: TaskRow): Task { }; } +function toTaskStageEvent(row: TaskStageEventRow): TaskStageEvent { + return { + id: row.id, + task_id: row.task_id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + status: row.status as TaskStatus, + created_at: row.created_at + }; +} + +function statusToStage(status: TaskStatus): TaskStage { + switch (status) { + case 'queued': + return 'queued'; + case 'running': + return 'running'; + case 'completed': + return 'completed'; + case 'failed': + return 'failed'; + default: + return 'failed'; + } +} + +async function insertTaskStageEvent(executor: InsertExecutor, input: EventInsertInput) { + await executor.insert(taskStageEvent).values({ + task_id: input.task_id, + user_id: input.user_id, + stage: input.stage, + stage_detail: input.stage_detail, + status: input.status, + created_at: input.created_at + }); +} + export async function createTaskRunRecord(input: CreateTaskInput) { const now = new Date().toISOString(); - const [row] = await db - .insert(taskRun) - .values({ - id: input.id, - user_id: input.user_id, - task_type: input.task_type, - status: 'queued', - priority: input.priority, - payload: input.payload, - result: null, - error: null, - attempts: 0, - max_attempts: input.max_attempts, - workflow_run_id: null, - created_at: now, - updated_at: now, - finished_at: null - }) - .returning(); + return await db.transaction(async (tx) => { + const [row] = await tx + .insert(taskRun) + .values({ + id: input.id, + user_id: input.user_id, + task_type: input.task_type, + status: 'queued', + stage: 'queued', + stage_detail: null, + resource_key: input.resource_key ?? null, + notification_read_at: null, + notification_silenced_at: null, + priority: input.priority, + payload: input.payload, + result: null, + error: null, + attempts: 0, + max_attempts: input.max_attempts, + workflow_run_id: null, + created_at: now, + updated_at: now, + finished_at: null + }) + .returning(); - return toTask(row); + await insertTaskStageEvent(tx, { + task_id: row.id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + status: row.status, + created_at: now + }); + + return toTask(row); + }); } export async function setTaskWorkflowRunId(taskId: string, workflowRunId: string) { @@ -121,67 +198,268 @@ export async function countTasksByStatus() { return queue; } -export async function claimQueuedTask(taskId: string) { +export async function findInFlightTaskByResourceKey( + userId: string, + taskType: TaskType, + resourceKey: string +) { const [row] = await db - .update(taskRun) - .set({ - status: 'running', - attempts: sql`${taskRun.attempts} + 1`, - updated_at: new Date().toISOString() - }) - .where(and(eq(taskRun.id, taskId), eq(taskRun.status, 'queued'))) - .returning(); + .select() + .from(taskRun) + .where(and( + eq(taskRun.user_id, userId), + eq(taskRun.task_type, taskType), + eq(taskRun.resource_key, resourceKey), + inArray(taskRun.status, ['queued', 'running']) + )) + .orderBy(desc(taskRun.created_at)) + .limit(1); return row ? toTask(row) : null; } +export async function markTaskRunning(taskId: string) { + const now = new Date().toISOString(); + + return await db.transaction(async (tx) => { + const [row] = await tx + .update(taskRun) + .set({ + status: 'running', + stage: 'running', + stage_detail: 'Workflow task is now running', + attempts: sql`${taskRun.attempts} + 1`, + updated_at: now, + finished_at: null + }) + .where(eq(taskRun.id, taskId)) + .returning(); + + if (!row) { + return null; + } + + await insertTaskStageEvent(tx, { + task_id: row.id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + status: row.status, + created_at: now + }); + + return toTask(row); + }); +} + +export async function updateTaskStage(taskId: string, stage: TaskStage, detail: string | null = null) { + const now = new Date().toISOString(); + + return await db.transaction(async (tx) => { + const [row] = await tx + .update(taskRun) + .set({ + stage, + stage_detail: detail, + updated_at: now + }) + .where(eq(taskRun.id, taskId)) + .returning(); + + if (!row) { + return null; + } + + await insertTaskStageEvent(tx, { + task_id: row.id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + status: row.status, + created_at: now + }); + + return toTask(row); + }); +} + export async function completeTask(taskId: string, result: Record) { + const now = new Date().toISOString(); + + return await db.transaction(async (tx) => { + const [row] = await tx + .update(taskRun) + .set({ + status: 'completed', + stage: 'completed', + stage_detail: null, + result, + error: null, + updated_at: now, + finished_at: now + }) + .where(eq(taskRun.id, taskId)) + .returning(); + + if (!row) { + return null; + } + + await insertTaskStageEvent(tx, { + task_id: row.id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + status: row.status, + created_at: now + }); + + return toTask(row); + }); +} + +export async function markTaskFailure(taskId: string, reason: string, stage: TaskStage = 'failed') { + const now = new Date().toISOString(); + + return await db.transaction(async (tx) => { + const [row] = await tx + .update(taskRun) + .set({ + status: 'failed', + stage, + stage_detail: null, + error: reason, + updated_at: now, + finished_at: now + }) + .where(eq(taskRun.id, taskId)) + .returning(); + + if (!row) { + return null; + } + + await insertTaskStageEvent(tx, { + task_id: row.id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + status: row.status, + created_at: now + }); + + return toTask(row); + }); +} + +export async function setTaskStatusFromWorkflow( + taskId: string, + status: TaskStatus, + error?: string | null +) { + const isTerminal = status === 'completed' || status === 'failed'; + const nextStage = statusToStage(status); + const nextError = status === 'failed' ? (error ?? 'Workflow run failed') : null; + + return await db.transaction(async (tx) => { + const [current] = await tx + .select() + .from(taskRun) + .where(eq(taskRun.id, taskId)) + .limit(1); + + if (!current) { + return null; + } + + const hasNoStateChange = current.status === status + && current.stage === nextStage + && (current.error ?? null) === nextError + && current.stage_detail === null + && (isTerminal ? current.finished_at !== null : current.finished_at === null); + + if (hasNoStateChange) { + return toTask(current); + } + + const now = new Date().toISOString(); + const [row] = await tx + .update(taskRun) + .set({ + status, + stage: nextStage, + stage_detail: null, + error: nextError, + updated_at: now, + finished_at: isTerminal ? now : null + }) + .where(eq(taskRun.id, taskId)) + .returning(); + + if (!row) { + return null; + } + + await insertTaskStageEvent(tx, { + task_id: row.id, + user_id: row.user_id, + stage: row.stage as TaskStage, + stage_detail: row.stage_detail, + status: row.status, + created_at: now + }); + + return toTask(row); + }); +} + +export async function updateTaskNotificationState( + taskId: string, + userId: string, + input: UpdateTaskNotificationStateInput +) { + const now = new Date().toISOString(); + const patch: Partial = { + updated_at: now + }; + + let hasMutation = false; + + if (typeof input.read === 'boolean') { + patch.notification_read_at = input.read ? now : null; + hasMutation = true; + } + + if (typeof input.silenced === 'boolean') { + patch.notification_silenced_at = input.silenced ? now : null; + hasMutation = true; + + if (input.silenced) { + patch.notification_read_at = now; + } + } + + if (!hasMutation) { + return await getTaskByIdForUser(taskId, userId); + } + const [row] = await db .update(taskRun) - .set({ - status: 'completed', - result, - error: null, - updated_at: new Date().toISOString(), - finished_at: new Date().toISOString() - }) - .where(eq(taskRun.id, taskId)) + .set(patch) + .where(and(eq(taskRun.id, taskId), eq(taskRun.user_id, userId))) .returning(); return row ? toTask(row) : null; } -export async function markTaskFailure(taskId: string, reason: string) { - const [current] = await db +export async function listTaskStageEventsForTask(taskId: string, userId: string) { + const rows = await db .select() - .from(taskRun) - .where(eq(taskRun.id, taskId)) - .limit(1); + .from(taskStageEvent) + .where(and(eq(taskStageEvent.task_id, taskId), eq(taskStageEvent.user_id, userId))) + .orderBy(asc(taskStageEvent.created_at), asc(taskStageEvent.id)); - if (!current) { - return { - task: null, - shouldRetry: false - }; - } - - const shouldRetry = current.attempts < current.max_attempts; - - const [updated] = await db - .update(taskRun) - .set({ - status: shouldRetry ? 'queued' : 'failed', - error: reason, - updated_at: new Date().toISOString(), - finished_at: shouldRetry ? null : new Date().toISOString() - }) - .where(eq(taskRun.id, taskId)) - .returning(); - - return { - task: updated ? toTask(updated) : null, - shouldRetry - }; + return rows.map(toTaskStageEvent); } export async function getTaskById(taskId: string) { diff --git a/lib/server/task-processors.ts b/lib/server/task-processors.ts index a7c4bec..621c478 100644 --- a/lib/server/task-processors.ts +++ b/lib/server/task-processors.ts @@ -3,7 +3,8 @@ import type { FilingExtraction, FilingExtractionMeta, Holding, - Task + Task, + TaskStage } from '@/lib/types'; import { runAiAnalysis } from '@/lib/server/ai'; import { buildPortfolioSummary } from '@/lib/server/portfolio'; @@ -24,6 +25,7 @@ import { listUserHoldings } from '@/lib/server/repos/holdings'; import { createPortfolioInsight } from '@/lib/server/repos/insights'; +import { updateTaskStage } from '@/lib/server/repos/tasks'; import { fetchFilingMetricsForFilings, fetchPrimaryFilingText, @@ -130,6 +132,10 @@ function toTaskResult(value: unknown): Record { return value as Record; } +async function setProjectionStage(task: Task, stage: TaskStage, detail: string | null = null) { + await updateTaskStage(task.id, stage, detail); +} + function parseTicker(raw: unknown) { if (typeof raw !== 'string' || raw.trim().length < 1) { throw new Error('Ticker is required'); @@ -513,6 +519,8 @@ function filingLinks(filing: { async function processSyncFilings(task: Task) { const ticker = parseTicker(task.payload.ticker); const limit = parseLimit(task.payload.limit, 20, 1, 50); + + await setProjectionStage(task, 'sync.fetch_filings', `Fetching up to ${limit} filings for ${ticker}`); const filings = await fetchRecentFilings(ticker, limit); const metricsByAccession = new Map(); const filingsByCik = new Map(); @@ -527,6 +535,7 @@ async function processSyncFilings(task: Task) { filingsByCik.set(filing.cik, [filing]); } + await setProjectionStage(task, 'sync.fetch_metrics', `Computing financial metrics for ${filings.length} filings`); for (const [cik, filingsForCik] of filingsByCik) { const filingsForFinancialMetrics = filingsForCik.filter((filing) => isFinancialMetricsForm(filing.filingType)); if (filingsForFinancialMetrics.length === 0) { @@ -548,6 +557,7 @@ async function processSyncFilings(task: Task) { } } + await setProjectionStage(task, 'sync.persist_filings', 'Persisting filings and links'); const saveResult = await upsertFilingsRecords( filings.map((filing) => ({ ticker: filing.ticker, @@ -574,6 +584,7 @@ async function processSyncFilings(task: Task) { return filing.filing_type === '10-K' || filing.filing_type === '10-Q'; }); + await setProjectionStage(task, 'sync.hydrate_statements', `Hydrating statement snapshots for ${hydrateCandidates.length} candidate filings`); for (const filing of hydrateCandidates) { const existingSnapshot = await getFilingStatementSnapshotByFilingId(filing.id); const shouldRefresh = !existingSnapshot @@ -634,15 +645,18 @@ async function processRefreshPrices(task: Task) { throw new Error('Task is missing user scope'); } + await setProjectionStage(task, 'refresh.load_holdings', 'Loading holdings for price refresh'); const userHoldings = await listHoldingsForPriceRefresh(userId); const tickers = [...new Set(userHoldings.map((entry) => entry.ticker))]; const quotes = new Map(); + await setProjectionStage(task, 'refresh.fetch_quotes', `Fetching quotes for ${tickers.length} tickers`); for (const ticker of tickers) { const quote = await getQuote(ticker); quotes.set(ticker, quote); } + await setProjectionStage(task, 'refresh.persist_prices', 'Writing refreshed prices to holdings'); const updatedCount = await applyRefreshedPrices(userId, quotes, new Date().toISOString()); return { @@ -660,6 +674,7 @@ async function processAnalyzeFiling(task: Task) { throw new Error('accessionNumber is required'); } + await setProjectionStage(task, 'analyze.load_filing', `Loading filing ${accessionNumber}`); const filing = await getFilingByAccession(accessionNumber); if (!filing) { @@ -676,6 +691,7 @@ async function processAnalyzeFiling(task: Task) { }; try { + await setProjectionStage(task, 'analyze.fetch_document', 'Fetching primary filing document'); const filingDocument = await fetchPrimaryFilingText({ filingUrl: filing.filing_url, cik: filing.cik, @@ -684,6 +700,7 @@ async function processAnalyzeFiling(task: Task) { }); if (filingDocument?.text) { + await setProjectionStage(task, 'analyze.extract', 'Generating extraction context from filing text'); const ruleBasedExtraction = buildRuleBasedExtraction(filing, filingDocument.text); extraction = ruleBasedExtraction; extractionMeta = { @@ -720,12 +737,14 @@ async function processAnalyzeFiling(task: Task) { }; } + await setProjectionStage(task, 'analyze.generate_report', 'Generating final filing analysis report'); const analysis = await runAiAnalysis( reportPrompt(filing, extraction, extractionMeta), 'Use concise institutional analyst language.', { workload: 'report' } ); + await setProjectionStage(task, 'analyze.persist_report', 'Persisting filing analysis output'); await saveFilingAnalysis(accessionNumber, { provider: analysis.provider, model: analysis.model, @@ -761,6 +780,7 @@ async function processPortfolioInsights(task: Task) { throw new Error('Task is missing user scope'); } + await setProjectionStage(task, 'insights.load_holdings', 'Loading holdings for portfolio insight generation'); const userHoldings = await listUserHoldings(userId); const summary = buildPortfolioSummary(userHoldings); @@ -771,12 +791,14 @@ async function processPortfolioInsights(task: Task) { 'Respond with: 1) health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.' ].join('\n'); + await setProjectionStage(task, 'insights.generate', 'Generating portfolio AI insight'); const analysis = await runAiAnalysis( prompt, 'Act as a risk-aware buy-side analyst.', { workload: 'report' } ); + await setProjectionStage(task, 'insights.persist', 'Persisting generated portfolio insight'); await createPortfolioInsight({ userId, provider: analysis.provider, diff --git a/lib/server/tasks.ts b/lib/server/tasks.ts index 07c4843..0493d84 100644 --- a/lib/server/tasks.ts +++ b/lib/server/tasks.ts @@ -1,14 +1,19 @@ import { randomUUID } from 'node:crypto'; -import { start } from 'workflow/api'; -import type { Task, TaskStatus, TaskType } from '@/lib/types'; +import { getRun, start } from 'workflow/api'; +import type { WorkflowRunStatus } from '@workflow/world'; +import type { Task, TaskStatus, TaskTimeline, TaskType } from '@/lib/types'; import { runTaskWorkflow } from '@/app/workflows/task-runner'; import { countTasksByStatus, createTaskRunRecord, + findInFlightTaskByResourceKey, getTaskByIdForUser, + listTaskStageEventsForTask, listRecentTasksForUser, markTaskFailure, - setTaskWorkflowRunId + setTaskStatusFromWorkflow, + setTaskWorkflowRunId, + updateTaskNotificationState } from '@/lib/server/repos/tasks'; type EnqueueTaskInput = { @@ -17,8 +22,71 @@ type EnqueueTaskInput = { payload?: Record; priority?: number; maxAttempts?: number; + resourceKey?: string; }; +type UpdateTaskNotificationInput = { + read?: boolean; + silenced?: boolean; +}; + +function mapWorkflowStatus(status: WorkflowRunStatus): TaskStatus { + switch (status) { + case 'pending': + return 'queued'; + case 'running': + return 'running'; + case 'completed': + return 'completed'; + case 'failed': + case 'cancelled': + return 'failed'; + default: + return 'failed'; + } +} + +function isProjectionPendingSync(task: Task) { + return task.status === 'queued' || task.status === 'running'; +} + +async function reconcileTaskWithWorkflow(task: Task) { + if (!task.workflow_run_id || !isProjectionPendingSync(task)) { + return task; + } + + try { + const run = getRun(task.workflow_run_id); + const workflowStatus = await run.status; + const nextStatus = mapWorkflowStatus(workflowStatus); + + if (nextStatus === task.status) { + return task; + } + + const nextError = nextStatus === 'failed' + ? workflowStatus === 'cancelled' + ? 'Workflow run cancelled' + : 'Workflow run failed' + : null; + + const updated = await setTaskStatusFromWorkflow(task.id, nextStatus, nextError); + + return updated ?? { + ...task, + status: nextStatus, + stage: nextStatus, + stage_detail: null, + error: nextError, + finished_at: nextStatus === 'queued' || nextStatus === 'running' + ? null + : task.finished_at ?? new Date().toISOString() + }; + } catch { + return task; + } +} + export async function enqueueTask(input: EnqueueTaskInput) { const task = await createTaskRunRecord({ id: randomUUID(), @@ -26,7 +94,8 @@ export async function enqueueTask(input: EnqueueTaskInput) { task_type: input.taskType, payload: input.payload ?? {}, priority: input.priority ?? 50, - max_attempts: input.maxAttempts ?? 3 + max_attempts: input.maxAttempts ?? 3, + resource_key: input.resourceKey ?? null }); try { @@ -41,17 +110,61 @@ export async function enqueueTask(input: EnqueueTaskInput) { const reason = error instanceof Error ? error.message : 'Failed to start workflow'; - await markTaskFailure(task.id, reason); + await markTaskFailure(task.id, reason, 'failed'); throw error; } } +export async function findInFlightTask(userId: string, taskType: TaskType, resourceKey: string) { + const task = await findInFlightTaskByResourceKey(userId, taskType, resourceKey); + + if (!task) { + return null; + } + + return await reconcileTaskWithWorkflow(task); +} + export async function getTaskById(taskId: string, userId: string) { - return await getTaskByIdForUser(taskId, userId); + const task = await getTaskByIdForUser(taskId, userId); + + if (!task) { + return null; + } + + return await reconcileTaskWithWorkflow(task); } export async function listRecentTasks(userId: string, limit = 20, statuses?: TaskStatus[]) { - return await listRecentTasksForUser(userId, limit, statuses); + const tasks = await listRecentTasksForUser(userId, limit, statuses); + return await Promise.all(tasks.map((task) => reconcileTaskWithWorkflow(task))); +} + +export async function updateTaskNotification( + userId: string, + taskId: string, + input: UpdateTaskNotificationInput +) { + const task = await updateTaskNotificationState(taskId, userId, input); + if (!task) { + return null; + } + + return await reconcileTaskWithWorkflow(task); +} + +export async function getTaskTimeline(taskId: string, userId: string): Promise { + const task = await getTaskById(taskId, userId); + if (!task) { + return null; + } + + const events = await listTaskStageEventsForTask(taskId, userId); + + return { + task, + events + }; } export async function getTaskQueueSnapshot() { diff --git a/lib/types.ts b/lib/types.ts index ec14677..81397d1 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -90,12 +90,37 @@ export type Filing = { export type TaskStatus = 'queued' | 'running' | 'completed' | 'failed'; export type TaskType = 'sync_filings' | 'refresh_prices' | 'analyze_filing' | 'portfolio_insights'; +export type TaskStage = + | 'queued' + | 'running' + | 'completed' + | 'failed' + | 'sync.fetch_filings' + | 'sync.fetch_metrics' + | 'sync.persist_filings' + | 'sync.hydrate_statements' + | 'refresh.load_holdings' + | 'refresh.fetch_quotes' + | 'refresh.persist_prices' + | 'analyze.load_filing' + | 'analyze.fetch_document' + | 'analyze.extract' + | 'analyze.generate_report' + | 'analyze.persist_report' + | 'insights.load_holdings' + | 'insights.generate' + | 'insights.persist'; export type Task = { id: string; user_id: string; task_type: TaskType; status: TaskStatus; + stage: TaskStage; + stage_detail: string | null; + resource_key: string | null; + notification_read_at: string | null; + notification_silenced_at: string | null; priority: number; payload: Record; result: Record | null; @@ -108,6 +133,21 @@ export type Task = { finished_at: string | null; }; +export type TaskStageEvent = { + id: number; + task_id: string; + user_id: string; + stage: TaskStage; + stage_detail: string | null; + status: TaskStatus; + created_at: string; +}; + +export type TaskTimeline = { + task: Task; + events: TaskStageEvent[]; +}; + export type PortfolioInsight = { id: number; user_id: string; diff --git a/package.json b/package.json index 8a3fe6b..70f84fe 100644 --- a/package.json +++ b/package.json @@ -8,10 +8,12 @@ "build": "bun --bun next build --turbopack", "start": "bun --bun next start", "lint": "bun --bun tsc --noEmit", + "workflow:setup": "workflow-postgres-setup", "backfill:filing-metrics": "bun run scripts/backfill-filing-metrics.ts", "backfill:filing-statements": "bun run scripts/backfill-filing-statements.ts", "db:generate": "bun x drizzle-kit generate", - "db:migrate": "bun x drizzle-kit migrate" + "db:migrate": "bun x drizzle-kit migrate", + "test:e2e:workflow": "RUN_TASK_WORKFLOW_E2E=1 bun test lib/server/api/task-workflow-hybrid.e2e.test.ts" }, "dependencies": { "@ai-sdk/openai": "^2.0.62", @@ -19,6 +21,7 @@ "@libsql/client": "^0.17.0", "@tailwindcss/postcss": "^4.2.1", "@tanstack/react-query": "^5.90.21", + "@workflow/world-postgres": "^4.1.0-beta.34", "ai": "^6.0.104", "better-auth": "^1.4.19", "clsx": "^2.1.1", @@ -30,6 +33,7 @@ "react": "^19.2.4", "react-dom": "^19.2.4", "recharts": "^3.7.0", + "sonner": "^2.0.7", "workflow": "^4.1.0-beta.60", "zhipu-ai-provider": "^0.2.2" },