feat: rebuild fiscal clone architecture and harden coolify deployment

This commit is contained in:
2026-02-23 21:10:39 -05:00
parent cae7cbb98f
commit 04e5caf4e1
61 changed files with 3826 additions and 2923 deletions

View File

@@ -1,14 +1,12 @@
FROM node:20-alpine AS base
FROM node:20-alpine
WORKDIR /app
# Install Bun and update npm
RUN npm install -g bun && npm install -g npm@latest
# Install dependencies
COPY package.json bun.lockb* ./
RUN bun install
COPY package.json bun.lock* ./
RUN bun install --frozen-lockfile || bun install
# Copy source code
COPY . .
ENV NODE_ENV=production
@@ -16,5 +14,4 @@ ENV PORT=3001
EXPOSE 3001
# Run directly from TypeScript source (Bun can execute TypeScript directly)
CMD ["bun", "run", "src/index.ts"]

View File

@@ -1,43 +1,64 @@
services:
postgres:
image: postgres:16-alpine
restart: unless-stopped
environment:
POSTGRES_USER: ${POSTGRES_USER:-postgres}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres}
POSTGRES_DB: ${POSTGRES_DB:-fiscal}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U ${POSTGRES_USER:-postgres} -d ${POSTGRES_DB:-fiscal}']
interval: 5s
timeout: 5s
retries: 10
backend:
build:
context: .
dockerfile: Dockerfile
restart: unless-stopped
command: ['sh', '-c', 'bun run src/db/migrate.ts && bun run src/index.ts']
environment:
- DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
- PORT=3001
DATABASE_URL: ${DATABASE_URL:-postgres://postgres:postgres@postgres:5432/fiscal}
PORT: ${PORT:-3001}
FRONTEND_URL: ${FRONTEND_URL:-http://localhost:3000}
BETTER_AUTH_SECRET: ${BETTER_AUTH_SECRET:-local-dev-better-auth-secret-change-me}
BETTER_AUTH_BASE_URL: ${BETTER_AUTH_BASE_URL:-http://localhost:3001}
SEC_USER_AGENT: ${SEC_USER_AGENT:-Fiscal Clone <support@example.com>}
OPENCLAW_BASE_URL: ${OPENCLAW_BASE_URL:-}
OPENCLAW_API_KEY: ${OPENCLAW_API_KEY:-}
OPENCLAW_MODEL: ${OPENCLAW_MODEL:-zeroclaw}
TASK_HEARTBEAT_SECONDS: ${TASK_HEARTBEAT_SECONDS:-15}
TASK_STALE_SECONDS: ${TASK_STALE_SECONDS:-120}
TASK_MAX_ATTEMPTS: ${TASK_MAX_ATTEMPTS:-3}
depends_on:
postgres:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3001/api/health"]
interval: 30s
timeout: 10s
retries: 3
networks:
- fiscal
postgres:
image: postgres:16-alpine
worker:
build:
context: .
dockerfile: Dockerfile
restart: unless-stopped
command: ['sh', '-c', 'bun run src/db/migrate.ts && bun run src/worker.ts']
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
interval: 5s
timeout: 5s
retries: 10
networks:
- fiscal
DATABASE_URL: ${DATABASE_URL:-postgres://postgres:postgres@postgres:5432/fiscal}
PORT: ${PORT:-3001}
FRONTEND_URL: ${FRONTEND_URL:-http://localhost:3000}
BETTER_AUTH_SECRET: ${BETTER_AUTH_SECRET:-local-dev-better-auth-secret-change-me}
BETTER_AUTH_BASE_URL: ${BETTER_AUTH_BASE_URL:-http://localhost:3001}
SEC_USER_AGENT: ${SEC_USER_AGENT:-Fiscal Clone <support@example.com>}
OPENCLAW_BASE_URL: ${OPENCLAW_BASE_URL:-}
OPENCLAW_API_KEY: ${OPENCLAW_API_KEY:-}
OPENCLAW_MODEL: ${OPENCLAW_MODEL:-zeroclaw}
TASK_HEARTBEAT_SECONDS: ${TASK_HEARTBEAT_SECONDS:-15}
TASK_STALE_SECONDS: ${TASK_STALE_SECONDS:-120}
TASK_MAX_ATTEMPTS: ${TASK_MAX_ATTEMPTS:-3}
depends_on:
postgres:
condition: service_healthy
volumes:
postgres_data:
networks:
fiscal:
external: true

View File

@@ -1,28 +1,26 @@
{
"name": "fiscal-backend",
"version": "0.1.0",
"version": "2.0.0",
"private": true,
"scripts": {
"dev": "bun run --watch src/index.ts",
"dev:worker": "bun run --watch src/worker.ts",
"start": "bun run src/index.ts",
"db:migrate": "bun run src/db/migrate.ts",
"db:seed": "bun run src/db/seed.ts"
"start:worker": "bun run src/worker.ts",
"db:migrate": "bun run src/db/migrate.ts"
},
"dependencies": {
"@elysiajs/cors": "^1.4.1",
"@elysiajs/swagger": "^1.3.1",
"bcryptjs": "^3.0.3",
"better-auth": "^1.4.18",
"dotenv": "^17.3.1",
"elysia": "^1.4.25",
"jsonwebtoken": "^9.0.3",
"pg": "^8.18.0",
"postgres": "^3.4.8",
"zod": "^4.3.6"
},
"devDependencies": {
"@types/pg": "^8.16.0",
"@types/bcryptjs": "^3.0.0",
"@types/jsonwebtoken": "^9.0.10",
"bun-types": "latest"
}
}

View File

@@ -1,34 +1,38 @@
import { betterAuth } from "better-auth";
import { Pool } from "pg";
import { betterAuth } from 'better-auth';
import { Pool } from 'pg';
import { env } from './config';
const defaultDatabaseUrl = `postgres://${process.env.POSTGRES_USER || 'postgres'}:${process.env.POSTGRES_PASSWORD || 'postgres'}@${process.env.POSTGRES_HOST || 'localhost'}:5432/${process.env.POSTGRES_DB || 'fiscal'}`;
const defaultFrontendUrl = process.env.FRONTEND_URL || 'http://localhost:3000';
const trustedOrigins = defaultFrontendUrl
.split(',')
.map((origin) => origin.trim())
.filter(Boolean);
const pool = new Pool({
connectionString: env.DATABASE_URL,
max: 20,
idleTimeoutMillis: 30_000
});
export const auth = betterAuth({
database: new Pool({
connectionString: process.env.DATABASE_URL || defaultDatabaseUrl,
}),
trustedOrigins,
secret: env.BETTER_AUTH_SECRET,
baseURL: env.BETTER_AUTH_BASE_URL,
database: pool,
trustedOrigins: env.FRONTEND_ORIGINS,
emailAndPassword: {
enabled: true,
autoSignIn: true,
autoSignIn: true
},
user: {
modelName: "users",
modelName: 'users',
additionalFields: {
name: {
type: "string",
required: false,
type: 'string',
required: false
},
},
image: {
type: 'string',
required: false
}
}
},
advanced: {
database: {
generateId: false, // Use PostgreSQL serial for users table
},
},
generateId: false
}
}
});

View File

@@ -1,109 +0,0 @@
import { db } from './db';
async function migrateToBetterAuth() {
console.log('Migrating to Better Auth schema...');
try {
// Add Better Auth columns to users table
await db`
ALTER TABLE users
ADD COLUMN IF NOT EXISTS email_verified BOOLEAN DEFAULT FALSE
`;
await db`
ALTER TABLE users
ADD COLUMN IF NOT EXISTS image TEXT
`;
console.log('✅ Added Better Auth columns to users table');
// Create session table
await db`
CREATE TABLE IF NOT EXISTS session (
id TEXT PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
token TEXT NOT NULL UNIQUE,
expires_at TIMESTAMP NOT NULL,
ip_address TEXT,
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`;
console.log('✅ Created session table');
// Create account table
await db`
CREATE TABLE IF NOT EXISTS account (
id TEXT PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
access_token TEXT,
refresh_token TEXT,
access_token_expires_at TIMESTAMP,
refresh_token_expires_at TIMESTAMP,
scope TEXT,
id_token TEXT,
password TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, provider_id, account_id)
)
`;
console.log('✅ Created account table');
// Create verification table
await db`
CREATE TABLE IF NOT EXISTS verification (
id TEXT PRIMARY KEY,
identifier TEXT NOT NULL,
value TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`;
console.log('✅ Created verification table');
// Create indexes
await db`CREATE INDEX IF NOT EXISTS idx_session_user_id ON session(user_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_session_token ON session(token)`;
await db`CREATE INDEX IF NOT EXISTS idx_session_expires_at ON session(expires_at)`;
await db`CREATE INDEX IF NOT EXISTS idx_account_user_id ON account(user_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_account_provider_id ON account(provider_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_verification_identifier ON verification(identifier)`;
await db`CREATE INDEX IF NOT EXISTS idx_verification_expires_at ON verification(expires_at)`;
console.log('✅ Created indexes');
// Migrate existing users to account table for credential auth
await db`
INSERT INTO account (id, user_id, account_id, provider_id, password, created_at, updated_at)
SELECT
gen_random_uuid(),
id,
id::text,
'credential',
password,
created_at,
updated_at
FROM users
WHERE password IS NOT NULL
ON CONFLICT DO NOTHING
`;
console.log('✅ Migrated existing users to account table');
console.log('✅ Better Auth migration completed!');
process.exit(0);
} catch (error) {
console.error('❌ Migration failed:', error);
process.exit(1);
}
}
migrateToBetterAuth();

47
backend/src/config.ts Normal file
View File

@@ -0,0 +1,47 @@
import * as dotenv from 'dotenv';
import { z } from 'zod';
dotenv.config();
const schema = z.object({
NODE_ENV: z.enum(['development', 'test', 'production']).default('development'),
PORT: z.coerce.number().int().positive().default(3001),
DATABASE_URL: z.string().optional(),
POSTGRES_USER: z.string().default('postgres'),
POSTGRES_PASSWORD: z.string().default('postgres'),
POSTGRES_HOST: z.string().default('localhost'),
POSTGRES_DB: z.string().default('fiscal'),
FRONTEND_URL: z.string().default('http://localhost:3000'),
BETTER_AUTH_SECRET: z.string().min(16).default('local-dev-better-auth-secret-change-me-1234'),
BETTER_AUTH_BASE_URL: z.string().url().default('http://localhost:3001'),
SEC_USER_AGENT: z.string().default('Fiscal Clone <support@fiscal.local>'),
OPENCLAW_BASE_URL: z.preprocess(
(value) => (typeof value === 'string' && value.trim() === '' ? undefined : value),
z.string().url().optional()
),
OPENCLAW_API_KEY: z.preprocess(
(value) => (typeof value === 'string' && value.trim() === '' ? undefined : value),
z.string().optional()
),
OPENCLAW_MODEL: z.string().default('zeroclaw'),
TASK_HEARTBEAT_SECONDS: z.coerce.number().int().positive().default(15),
TASK_STALE_SECONDS: z.coerce.number().int().positive().default(120),
TASK_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3)
});
const parsed = schema.safeParse(process.env);
if (!parsed.success) {
console.error('Invalid environment configuration', parsed.error.flatten().fieldErrors);
throw new Error('Invalid environment variables');
}
const rawEnv = parsed.data;
const databaseUrl = rawEnv.DATABASE_URL
?? `postgres://${rawEnv.POSTGRES_USER}:${rawEnv.POSTGRES_PASSWORD}@${rawEnv.POSTGRES_HOST}:5432/${rawEnv.POSTGRES_DB}`;
export const env = {
...rawEnv,
DATABASE_URL: databaseUrl,
FRONTEND_ORIGINS: rawEnv.FRONTEND_URL.split(',').map((origin) => origin.trim()).filter(Boolean)
};

View File

@@ -1,47 +1,13 @@
import postgres from 'postgres';
import { env } from '../config';
const defaultDatabaseUrl = `postgres://${process.env.POSTGRES_USER || 'postgres'}:${process.env.POSTGRES_PASSWORD || 'postgres'}@${process.env.POSTGRES_HOST || 'localhost'}:5432/${process.env.POSTGRES_DB || 'fiscal'}`;
const sql = postgres(process.env.DATABASE_URL || defaultDatabaseUrl, {
max: 10,
export const db = postgres(env.DATABASE_URL, {
max: 20,
idle_timeout: 20,
connect_timeout: 10
connect_timeout: 10,
prepare: true
});
export const db = sql;
export type Filings = {
id: number;
ticker: string;
filing_type: string;
filing_date: Date;
accession_number: string;
cik: string;
company_name: string;
key_metrics?: any;
insights?: string;
created_at: Date;
};
export type Portfolio = {
id: number;
user_id: string;
ticker: string;
shares: number;
avg_cost: number;
current_price?: number;
current_value?: number;
gain_loss?: number;
gain_loss_pct?: number;
last_updated?: Date;
created_at: Date;
};
export type Watchlist = {
id: number;
user_id: string;
ticker: string;
company_name: string;
sector?: string;
created_at: Date;
};
export async function closeDb() {
await db.end({ timeout: 5 });
}

View File

@@ -1,107 +1,256 @@
import { db } from './index';
async function migrate() {
console.log('Running migrations...');
console.log('Running database migrations...');
await db`CREATE EXTENSION IF NOT EXISTS pgcrypto`;
// Create users table
await db`
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password TEXT NOT NULL,
name VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
email TEXT UNIQUE NOT NULL,
email_verified BOOLEAN NOT NULL DEFAULT FALSE,
name TEXT,
image TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`;
await db`ALTER TABLE users ADD COLUMN IF NOT EXISTS email_verified BOOLEAN NOT NULL DEFAULT FALSE`;
await db`ALTER TABLE users ADD COLUMN IF NOT EXISTS image TEXT`;
await db`
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name = 'users'
AND column_name = 'password'
) THEN
EXECUTE 'ALTER TABLE users ALTER COLUMN password DROP NOT NULL';
END IF;
END
$$
`;
await db`
CREATE TABLE IF NOT EXISTS session (
id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token TEXT NOT NULL UNIQUE,
expires_at TIMESTAMPTZ NOT NULL,
ip_address TEXT,
user_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`;
await db`
CREATE TABLE IF NOT EXISTS account (
id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
access_token TEXT,
refresh_token TEXT,
access_token_expires_at TIMESTAMPTZ,
refresh_token_expires_at TIMESTAMPTZ,
scope TEXT,
id_token TEXT,
password TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, provider_id, account_id)
)
`;
await db`
CREATE TABLE IF NOT EXISTS verification (
id TEXT PRIMARY KEY,
identifier TEXT NOT NULL,
value TEXT NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`;
// Create filings table
await db`
CREATE TABLE IF NOT EXISTS filings (
id SERIAL PRIMARY KEY,
ticker VARCHAR(10) NOT NULL,
id BIGSERIAL PRIMARY KEY,
ticker VARCHAR(12) NOT NULL,
filing_type VARCHAR(20) NOT NULL,
filing_date DATE NOT NULL,
accession_number VARCHAR(40) UNIQUE NOT NULL,
accession_number VARCHAR(40) NOT NULL UNIQUE,
cik VARCHAR(20) NOT NULL,
company_name TEXT NOT NULL,
key_metrics JSONB,
insights TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
filing_url TEXT,
metrics JSONB,
analysis JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`;
// Create portfolio table
await db`ALTER TABLE filings ADD COLUMN IF NOT EXISTS filing_url TEXT`;
await db`ALTER TABLE filings ADD COLUMN IF NOT EXISTS metrics JSONB`;
await db`ALTER TABLE filings ADD COLUMN IF NOT EXISTS analysis JSONB`;
await db`ALTER TABLE filings ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()`;
await db`
CREATE TABLE IF NOT EXISTS portfolio (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
ticker VARCHAR(10) NOT NULL,
shares NUMERIC(20, 4) NOT NULL,
avg_cost NUMERIC(10, 4) NOT NULL,
current_price NUMERIC(10, 4),
current_value NUMERIC(20, 4),
gain_loss NUMERIC(20, 4),
gain_loss_pct NUMERIC(10, 4),
last_updated TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, ticker)
)
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name = 'filings'
AND column_name = 'key_metrics'
) THEN
EXECUTE 'UPDATE filings SET metrics = COALESCE(metrics, key_metrics) WHERE metrics IS NULL';
END IF;
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name = 'filings'
AND column_name = 'insights'
) THEN
EXECUTE $migrate$
UPDATE filings
SET analysis = COALESCE(analysis, jsonb_build_object('legacyInsights', insights))
WHERE analysis IS NULL
AND insights IS NOT NULL
$migrate$;
END IF;
END
$$
`;
// Create watchlist table
await db`
CREATE TABLE IF NOT EXISTS watchlist (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
ticker VARCHAR(10) NOT NULL,
id BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
ticker VARCHAR(12) NOT NULL,
company_name TEXT NOT NULL,
sector VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
sector VARCHAR(120),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, ticker)
)
`;
// Create indexes
await db`CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)`;
await db`CREATE INDEX IF NOT EXISTS idx_filings_ticker ON filings(ticker)`;
await db`CREATE INDEX IF NOT EXISTS idx_filings_date ON filings(filing_date DESC)`;
await db`CREATE INDEX IF NOT EXISTS idx_portfolio_user ON portfolio(user_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_watchlist_user ON watchlist(user_id)`;
// Create function to update portfolio prices
await db`
CREATE OR REPLACE FUNCTION update_portfolio_prices()
RETURNS TRIGGER AS $$
CREATE TABLE IF NOT EXISTS holdings (
id BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
ticker VARCHAR(12) NOT NULL,
shares NUMERIC(20, 4) NOT NULL,
avg_cost NUMERIC(12, 4) NOT NULL,
current_price NUMERIC(12, 4),
market_value NUMERIC(20, 4) GENERATED ALWAYS AS ((COALESCE(current_price, avg_cost) * shares)) STORED,
gain_loss NUMERIC(20, 4) GENERATED ALWAYS AS (((COALESCE(current_price, avg_cost) - avg_cost) * shares)) STORED,
gain_loss_pct NUMERIC(12, 4) GENERATED ALWAYS AS (
CASE
WHEN avg_cost > 0 THEN (((COALESCE(current_price, avg_cost) - avg_cost) / avg_cost) * 100)
ELSE 0
END
) STORED,
last_price_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, ticker)
)
`;
await db`
DO $$
BEGIN
NEW.current_value := NEW.shares * NEW.current_price;
NEW.gain_loss := NEW.current_value - (NEW.shares * NEW.avg_cost);
NEW.gain_loss_pct := CASE
WHEN NEW.avg_cost > 0 THEN ((NEW.current_price - NEW.avg_cost) / NEW.avg_cost) * 100
ELSE 0
END;
NEW.last_updated := NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'portfolio'
) THEN
EXECUTE $migrate$
INSERT INTO holdings (
user_id,
ticker,
shares,
avg_cost,
current_price,
last_price_at,
created_at,
updated_at
)
SELECT
user_id,
ticker,
shares,
avg_cost,
current_price,
last_updated,
created_at,
NOW()
FROM portfolio
ON CONFLICT (user_id, ticker) DO NOTHING
$migrate$;
END IF;
END
$$
`;
// Create trigger
await db`
DROP TRIGGER IF EXISTS update_portfolio_prices_trigger ON portfolio
`;
await db`
CREATE TRIGGER update_portfolio_prices_trigger
BEFORE INSERT OR UPDATE ON portfolio
FOR EACH ROW
EXECUTE FUNCTION update_portfolio_prices()
CREATE TABLE IF NOT EXISTS portfolio_insights (
id BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
provider TEXT NOT NULL,
model TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`;
console.log('✅ Migrations completed!');
process.exit(0);
await db`
CREATE TABLE IF NOT EXISTS long_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_type TEXT NOT NULL,
status TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 50,
payload JSONB NOT NULL,
result JSONB,
error TEXT,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
heartbeat_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
created_by INTEGER REFERENCES users(id) ON DELETE SET NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT long_tasks_status_check CHECK (status IN ('queued', 'running', 'completed', 'failed'))
)
`;
await db`CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)`;
await db`CREATE INDEX IF NOT EXISTS idx_session_token ON session(token)`;
await db`CREATE INDEX IF NOT EXISTS idx_session_user ON session(user_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_account_user ON account(user_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_watchlist_user ON watchlist(user_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_holdings_user ON holdings(user_id)`;
await db`CREATE INDEX IF NOT EXISTS idx_filings_ticker_date ON filings(ticker, filing_date DESC)`;
await db`CREATE INDEX IF NOT EXISTS idx_filings_accession ON filings(accession_number)`;
await db`CREATE INDEX IF NOT EXISTS idx_portfolio_insights_user ON portfolio_insights(user_id, created_at DESC)`;
await db`CREATE INDEX IF NOT EXISTS idx_long_tasks_status_sched ON long_tasks(status, scheduled_at, priority DESC, created_at)`;
await db`CREATE INDEX IF NOT EXISTS idx_long_tasks_user ON long_tasks(created_by, created_at DESC)`;
console.log('Migrations completed successfully.');
}
migrate().catch(error => {
console.error('❌ Migration failed:', error);
process.exit(1);
});
migrate()
.then(() => process.exit(0))
.catch((error) => {
console.error('Migration failed', error);
process.exit(1);
});

View File

@@ -1,51 +1,58 @@
import { Elysia } from 'elysia';
import { cors } from '@elysiajs/cors';
import { swagger } from '@elysiajs/swagger';
import * as dotenv from 'dotenv';
dotenv.config();
import { env } from './config';
import { db } from './db';
import { filingsRoutes } from './routes/filings';
import { portfolioRoutes } from './routes/portfolio';
import { openclawRoutes } from './routes/openclaw';
import { watchlistRoutes } from './routes/watchlist';
import { betterAuthRoutes } from './routes/better-auth';
import { filingsRoutes } from './routes/filings';
import { meRoutes } from './routes/me';
import { openclawRoutes } from './routes/openclaw';
import { portfolioRoutes } from './routes/portfolio';
import { taskRoutes } from './routes/tasks';
import { watchlistRoutes } from './routes/watchlist';
const frontendOrigin = process.env.FRONTEND_URL || 'http://localhost:3000';
const app = new Elysia({
prefix: '/api'
})
const app = new Elysia({ prefix: '/api' })
.use(cors({
origin: frontendOrigin,
origin: env.FRONTEND_ORIGINS,
credentials: true,
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
allowedHeaders: ['Content-Type', 'Authorization'],
methods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS']
}))
.use(swagger({
documentation: {
info: {
title: 'Fiscal Clone API',
version: '1.0.0',
description: 'Financial filings and portfolio analytics API'
version: '2.0.0',
description: 'Futuristic fiscal intelligence API with durable jobs and OpenClaw integration.'
}
}
}))
.use(betterAuthRoutes)
.use(filingsRoutes)
.use(portfolioRoutes)
.use(meRoutes)
.use(watchlistRoutes)
.use(portfolioRoutes)
.use(filingsRoutes)
.use(openclawRoutes)
.use(taskRoutes)
.get('/health', async () => {
const queueRows = await db`
SELECT status, COUNT(*)::int AS count
FROM long_tasks
GROUP BY status
`;
// Health check
.get('/health', () => ({
status: 'ok',
timestamp: new Date().toISOString(),
version: '1.0.0',
database: 'connected'
}))
return {
status: 'ok',
version: '2.0.0',
timestamp: new Date().toISOString(),
queue: queueRows.reduce<Record<string, number>>((acc, row) => {
acc[row.status] = row.count;
return acc;
}, {})
};
});
.listen(process.env.PORT || 3001);
app.listen(env.PORT);
console.log(`🚀 Backend running on http://localhost:${app.server?.port}`);
console.log(`📚 Swagger docs: http://localhost:${app.server?.port}/swagger`);
console.log(`Fiscal backend listening on http://localhost:${app.server?.port}`);
console.log(`Swagger docs: http://localhost:${app.server?.port}/swagger`);

View File

@@ -1,122 +0,0 @@
import { Elysia, t } from 'elysia';
import * as bcrypt from 'bcryptjs';
import jwt from 'jsonwebtoken';
import { db } from '../db';
const JWT_SECRET = process.env.JWT_SECRET || 'your-secret-key-change-in-production';
export const authRoutes = new Elysia({ prefix: '/auth' })
/**
* Register new user
*/
.post('/register', async ({ body }) => {
const { email, password, name } = body;
// Check if user exists
const existing = await db`
SELECT id FROM users WHERE email = ${email}
`;
if (existing.length > 0) {
return { error: 'User already exists' };
}
// Hash password
const hashedPassword = await bcrypt.hash(password, 10);
// Create user
const result = await db`
INSERT INTO users ${db({ email, password: hashedPassword, name })}
RETURNING id, email, name
`;
const user = result[0];
// Generate JWT
const token = jwt.sign(
{ id: user.id, email: user.email },
JWT_SECRET,
{ expiresIn: '30d' }
);
return {
success: true,
user: { id: user.id, email: user.email, name: user.name },
token
};
}, {
body: t.Object({
email: t.String({ format: 'email' }),
password: t.String({ minLength: 8 }),
name: t.String()
})
})
/**
* Login
*/
.post('/login', async ({ body }) => {
const { email, password } = body;
// Find user
const users = await db`
SELECT * FROM users WHERE email = ${email}
`;
if (users.length === 0) {
return { error: 'Invalid credentials' };
}
const user = users[0];
// Verify password
const validPassword = await bcrypt.compare(password, user.password);
if (!validPassword) {
return { error: 'Invalid credentials' };
}
// Generate JWT
const token = jwt.sign(
{ id: user.id, email: user.email },
JWT_SECRET,
{ expiresIn: '30d' }
);
return {
success: true,
user: { id: user.id, email: user.email, name: user.name },
token
};
}, {
body: t.Object({
email: t.String({ format: 'email' }),
password: t.String()
})
})
/**
* Verify token (for NextAuth credentials provider)
*/
.post('/verify', async ({ body, set }) => {
try {
const decoded = jwt.verify(body.token, JWT_SECRET) as any;
if (!decoded.id || !decoded.email) {
set.status = 401;
return { error: 'Invalid token' };
}
return {
success: true,
user: { id: decoded.id, email: decoded.email }
};
} catch (error) {
set.status = 401;
return { error: 'Invalid token' };
}
}, {
body: t.Object({
token: t.String()
})
});

View File

@@ -1,7 +1,7 @@
import { Elysia } from 'elysia';
import { auth } from '../auth';
export const betterAuthRoutes = new Elysia()
.all('/auth/*', async ({ request }) => {
return auth.handler(request);
export const betterAuthRoutes = new Elysia({ prefix: '/auth' })
.all('/*', async ({ request }) => {
return await auth.handler(request);
});

View File

@@ -0,0 +1,16 @@
import { UnauthorizedError } from '../session';
export function toHttpError(set: { status: number }, error: unknown) {
if (error instanceof UnauthorizedError) {
set.status = 401;
return { error: error.message };
}
if (error instanceof Error) {
set.status = 500;
return { error: error.message };
}
set.status = 500;
return { error: 'Unexpected error' };
}

View File

@@ -1,47 +1,107 @@
import { Elysia, t } from 'elysia';
import { SECScraper } from '../services/sec';
import { db } from '../db';
const sec = new SECScraper();
import { requireSessionUser } from '../session';
import { enqueueTask } from '../tasks/repository';
import { toHttpError } from './error';
export const filingsRoutes = new Elysia({ prefix: '/filings' })
.get('/', async () => {
const filings = await db`
SELECT * FROM filings
ORDER BY filing_date DESC
LIMIT 100
`;
return filings;
})
.get('/', async ({ request, set, query }) => {
try {
await requireSessionUser(request);
const tickerFilter = query.ticker?.trim().toUpperCase();
const limit = Number(query.limit ?? 50);
const safeLimit = Number.isFinite(limit) ? Math.min(Math.max(limit, 1), 200) : 50;
.get('/:ticker', async ({ params }) => {
const filings = await db`
SELECT * FROM filings
WHERE ticker = ${params.ticker.toUpperCase()}
ORDER BY filing_date DESC
LIMIT 50
`;
return filings;
})
const rows = tickerFilter
? await db`
SELECT *
FROM filings
WHERE ticker = ${tickerFilter}
ORDER BY filing_date DESC, created_at DESC
LIMIT ${safeLimit}
`
: await db`
SELECT *
FROM filings
ORDER BY filing_date DESC, created_at DESC
LIMIT ${safeLimit}
`;
.get('/details/:accessionNumber', async ({ params }) => {
const details = await db`
SELECT * FROM filings
WHERE accession_number = ${params.accessionNumber}
`;
return details[0] || null;
})
.post('/refresh/:ticker', async ({ params }) => {
const newFilings = await sec.searchFilings(params.ticker, 5);
for (const filing of newFilings) {
const metrics = await sec['extractKeyMetrics'](filing);
await db`
INSERT INTO filings ${db(filing, metrics)}
ON CONFLICT (accession_number) DO NOTHING
`;
return { filings: rows };
} catch (error) {
return toHttpError(set, error);
}
}, {
query: t.Object({
ticker: t.Optional(t.String()),
limit: t.Optional(t.Numeric())
})
})
.get('/:accessionNumber', async ({ request, set, params }) => {
try {
await requireSessionUser(request);
const rows = await db`
SELECT *
FROM filings
WHERE accession_number = ${params.accessionNumber}
LIMIT 1
`;
return { success: true, count: newFilings.length };
if (!rows[0]) {
set.status = 404;
return { error: 'Filing not found' };
}
return { filing: rows[0] };
} catch (error) {
return toHttpError(set, error);
}
}, {
params: t.Object({
accessionNumber: t.String({ minLength: 8 })
})
})
.post('/sync', async ({ request, set, body }) => {
try {
const user = await requireSessionUser(request);
const task = await enqueueTask({
taskType: 'sync_filings',
payload: {
ticker: body.ticker.trim().toUpperCase(),
limit: body.limit ?? 20
},
createdBy: user.id,
priority: 90
});
return { task };
} catch (error) {
return toHttpError(set, error);
}
}, {
body: t.Object({
ticker: t.String({ minLength: 1, maxLength: 12 }),
limit: t.Optional(t.Number({ minimum: 1, maximum: 50 }))
})
})
.post('/:accessionNumber/analyze', async ({ request, set, params }) => {
try {
const user = await requireSessionUser(request);
const task = await enqueueTask({
taskType: 'analyze_filing',
payload: {
accessionNumber: params.accessionNumber
},
createdBy: user.id,
priority: 65
});
return { task };
} catch (error) {
return toHttpError(set, error);
}
}, {
params: t.Object({
accessionNumber: t.String({ minLength: 8 })
})
});

13
backend/src/routes/me.ts Normal file
View File

@@ -0,0 +1,13 @@
import { Elysia } from 'elysia';
import { requireSessionUser } from '../session';
import { toHttpError } from './error';
export const meRoutes = new Elysia({ prefix: '/me' })
.get('/', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
return { user };
} catch (error) {
return toHttpError(set, error);
}
});

View File

@@ -1,121 +1,53 @@
import { Elysia, t } from 'elysia';
import { db } from '../db';
import { env } from '../config';
import { requireSessionUser } from '../session';
import { enqueueTask } from '../tasks/repository';
import { toHttpError } from './error';
interface OpenClawMessage {
text: string;
channelId?: string;
}
export const openclawRoutes = new Elysia({ prefix: '/openclaw' })
/**
* Trigger Discord notification for new filing
*/
.post('/notify/filing', async ({ body }) => {
// This endpoint can be called by cron jobs or external webhooks
// to send Discord notifications about new filings
const message = `📄 **New SEC Filing**
**Ticker:** ${body.ticker}
**Type:** ${body.filingType}
**Date:** ${body.filingDate}
View details: ${body.url}`;
// In production, this would send to Discord via webhook
// For now, we just log it
console.log('[DISCORD]', message);
return { success: true, message };
}, {
body: t.Object({
ticker: t.String(),
filingType: t.String(),
filingDate: t.String(),
url: t.String()
})
})
/**
* Get AI insights for portfolio
*/
.post('/insights/portfolio', async ({ body }) => {
const holdings = await db`
SELECT * FROM portfolio
WHERE user_id = ${body.userId}
`;
// Generate AI analysis
const prompt = `
Analyze this portfolio:
${JSON.stringify(holdings, null, 2)}
Provide:
1. Overall portfolio health assessment
2. Risk analysis
3. Top 3 recommendations
4. Any concerning patterns
`;
// This would call OpenClaw's AI
// For now, return placeholder
return {
health: 'moderate',
risk: 'medium',
recommendations: [
'Consider diversifying sector exposure',
'Review underperforming positions',
'Rebalance portfolio'
],
analysis: 'Portfolio shows mixed performance with some concentration risk.'
};
}, {
body: t.Object({
userId: t.String()
})
})
/**
* Get AI insights for a specific filing
*/
.post('/insights/filing', async ({ body }) => {
const filing = await db`
SELECT * FROM filings
WHERE accession_number = ${body.accessionNumber}
`;
if (!filing) {
return { error: 'Filing not found' };
export const openclawRoutes = new Elysia({ prefix: '/ai' })
.get('/status', async ({ request, set }) => {
try {
await requireSessionUser(request);
return {
configured: Boolean(env.OPENCLAW_BASE_URL && env.OPENCLAW_API_KEY),
baseUrl: env.OPENCLAW_BASE_URL ?? null,
model: env.OPENCLAW_MODEL
};
} catch (error) {
return toHttpError(set, error);
}
})
.post('/portfolio-insights', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
const task = await enqueueTask({
taskType: 'portfolio_insights',
payload: { userId: user.id },
createdBy: user.id,
priority: 70
});
const prompt = `
Analyze this SEC filing:
return { task };
} catch (error) {
return toHttpError(set, error);
}
})
.post('/filing-insights', async ({ request, set, body }) => {
try {
const user = await requireSessionUser(request);
const task = await enqueueTask({
taskType: 'analyze_filing',
payload: { accessionNumber: body.accessionNumber },
createdBy: user.id,
priority: 65
});
**Company:** ${filing.company_name}
**Ticker:** ${filing.ticker}
**Type:** ${filing.filing_type}
**Date:** ${filing.filing_date}
**Key Metrics:**
${JSON.stringify(filing.key_metrics, null, 2)}
Provide key insights and any red flags.
`;
// Store insights
await db`
UPDATE filings
SET insights = ${prompt}
WHERE accession_number = ${body.accessionNumber}
`;
return {
insights: 'Analysis saved',
filing
};
return { task };
} catch (error) {
return toHttpError(set, error);
}
}, {
body: t.Object({
accessionNumber: t.String()
accessionNumber: t.String({ minLength: 8 })
})
});

View File

@@ -1,65 +1,197 @@
import { Elysia, t } from 'elysia';
import { db, type Portfolio } from '../db';
import { db } from '../db';
import { requireSessionUser } from '../session';
import { enqueueTask } from '../tasks/repository';
import { toHttpError } from './error';
export const portfolioRoutes = new Elysia({ prefix: '/portfolio' })
.get('/:userId', async ({ params }) => {
const holdings = await db`
SELECT * FROM portfolio
WHERE user_id = ${params.userId}
ORDER BY ticker
`;
.get('/holdings', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
const holdings = await db`
SELECT
id,
user_id,
ticker,
shares,
avg_cost,
current_price,
market_value,
gain_loss,
gain_loss_pct,
last_price_at,
created_at,
updated_at
FROM holdings
WHERE user_id = ${user.id}
ORDER BY market_value DESC, ticker ASC
`;
return holdings;
return { holdings };
} catch (error) {
return toHttpError(set, error);
}
})
.post('/holdings', async ({ request, set, body }) => {
try {
const user = await requireSessionUser(request);
const ticker = body.ticker.trim().toUpperCase();
.post('/', async ({ body }) => {
const result = await db`
INSERT INTO portfolio ${db(body as Portfolio)}
ON CONFLICT (user_id, ticker)
DO UPDATE SET
shares = EXCLUDED.shares,
avg_cost = EXCLUDED.avg_cost,
current_price = EXCLUDED.current_price
RETURNING *
`;
const rows = await db`
INSERT INTO holdings (
user_id,
ticker,
shares,
avg_cost,
current_price
) VALUES (
${user.id},
${ticker},
${body.shares},
${body.avgCost},
${body.currentPrice ?? null}
)
ON CONFLICT (user_id, ticker)
DO UPDATE SET
shares = EXCLUDED.shares,
avg_cost = EXCLUDED.avg_cost,
current_price = COALESCE(EXCLUDED.current_price, holdings.current_price),
updated_at = NOW()
RETURNING *
`;
return result[0];
return { holding: rows[0] };
} catch (error) {
return toHttpError(set, error);
}
}, {
body: t.Object({
user_id: t.String(),
ticker: t.String(),
shares: t.Number(),
avg_cost: t.Number(),
current_price: t.Optional(t.Number())
ticker: t.String({ minLength: 1, maxLength: 12 }),
shares: t.Number({ minimum: 0.0001 }),
avgCost: t.Number({ minimum: 0.0001 }),
currentPrice: t.Optional(t.Number({ minimum: 0 }))
})
})
.patch('/holdings/:id', async ({ request, set, params, body }) => {
try {
const user = await requireSessionUser(request);
const rows = await db`
UPDATE holdings
SET
shares = COALESCE(${body.shares ?? null}, shares),
avg_cost = COALESCE(${body.avgCost ?? null}, avg_cost),
current_price = COALESCE(${body.currentPrice ?? null}, current_price),
updated_at = NOW()
WHERE id = ${params.id}
AND user_id = ${user.id}
RETURNING *
`;
.put('/:id', async ({ params, body }) => {
const result = await db`
UPDATE portfolio
SET ${db(body)}
WHERE id = ${params.id}
RETURNING *
`;
if (!rows[0]) {
set.status = 404;
return { error: 'Holding not found' };
}
return result[0] || null;
return { holding: rows[0] };
} catch (error) {
return toHttpError(set, error);
}
}, {
params: t.Object({
id: t.Numeric()
}),
body: t.Object({
shares: t.Optional(t.Number({ minimum: 0.0001 })),
avgCost: t.Optional(t.Number({ minimum: 0.0001 })),
currentPrice: t.Optional(t.Number({ minimum: 0 }))
})
})
.delete('/holdings/:id', async ({ request, set, params }) => {
try {
const user = await requireSessionUser(request);
const rows = await db`
DELETE FROM holdings
WHERE id = ${params.id}
AND user_id = ${user.id}
RETURNING id
`;
.delete('/:id', async ({ params }) => {
await db`DELETE FROM portfolio WHERE id = ${params.id}`;
return { success: true };
if (!rows[0]) {
set.status = 404;
return { error: 'Holding not found' };
}
return { success: true };
} catch (error) {
return toHttpError(set, error);
}
}, {
params: t.Object({
id: t.Numeric()
})
})
.get('/summary', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
const rows = await db`
SELECT
COUNT(*)::int AS positions,
COALESCE(SUM(market_value), 0)::numeric AS total_value,
COALESCE(SUM(gain_loss), 0)::numeric AS total_gain_loss,
COALESCE(SUM(shares * avg_cost), 0)::numeric AS total_cost_basis,
COALESCE(AVG(gain_loss_pct), 0)::numeric AS avg_return_pct
FROM holdings
WHERE user_id = ${user.id}
`;
.get('/:userId/summary', async ({ params }) => {
const summary = await db`
SELECT
COUNT(*) as total_positions,
COALESCE(SUM(current_value), 0) as total_value,
COALESCE(SUM(gain_loss), 0) as total_gain_loss,
COALESCE(SUM(current_value) - SUM(shares * avg_cost), 0) as cost_basis
FROM portfolio
WHERE user_id = ${params.userId}
`;
return { summary: rows[0] };
} catch (error) {
return toHttpError(set, error);
}
})
.post('/refresh-prices', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
const task = await enqueueTask({
taskType: 'refresh_prices',
payload: { userId: user.id },
createdBy: user.id,
priority: 80
});
return summary[0];
return { task };
} catch (error) {
return toHttpError(set, error);
}
})
.post('/insights/generate', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
const task = await enqueueTask({
taskType: 'portfolio_insights',
payload: { userId: user.id },
createdBy: user.id,
priority: 70
});
return { task };
} catch (error) {
return toHttpError(set, error);
}
})
.get('/insights/latest', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
const rows = await db`
SELECT id, user_id, provider, model, content, created_at
FROM portfolio_insights
WHERE user_id = ${user.id}
ORDER BY created_at DESC
LIMIT 1
`;
return { insight: rows[0] ?? null };
} catch (error) {
return toHttpError(set, error);
}
});

View File

@@ -0,0 +1,40 @@
import { Elysia, t } from 'elysia';
import { requireSessionUser } from '../session';
import { getTaskById, listRecentTasks } from '../tasks/repository';
import { toHttpError } from './error';
export const taskRoutes = new Elysia({ prefix: '/tasks' })
.get('/', async ({ request, set, query }) => {
try {
const user = await requireSessionUser(request);
const limit = Number(query.limit ?? 20);
const safeLimit = Number.isFinite(limit) ? Math.min(Math.max(limit, 1), 50) : 20;
const tasks = await listRecentTasks(user.id, safeLimit);
return { tasks };
} catch (error) {
return toHttpError(set, error);
}
}, {
query: t.Object({
limit: t.Optional(t.Numeric())
})
})
.get('/:taskId', async ({ request, set, params }) => {
try {
const user = await requireSessionUser(request);
const task = await getTaskById(params.taskId, user.id);
if (!task) {
set.status = 404;
return { error: 'Task not found' };
}
return { task };
} catch (error) {
return toHttpError(set, error);
}
}, {
params: t.Object({
taskId: t.String()
})
});

View File

@@ -1,35 +1,80 @@
import { Elysia, t } from 'elysia';
import { db } from '../db';
import { requireSessionUser } from '../session';
import { toHttpError } from './error';
export const watchlistRoutes = new Elysia({ prefix: '/watchlist' })
.get('/:userId', async ({ params }) => {
const watchlist = await db`
SELECT * FROM watchlist
WHERE user_id = ${params.userId}
ORDER BY created_at DESC
`;
.get('/', async ({ request, set }) => {
try {
const user = await requireSessionUser(request);
const watchlist = await db`
SELECT id, user_id, ticker, company_name, sector, created_at
FROM watchlist
WHERE user_id = ${user.id}
ORDER BY created_at DESC
`;
return watchlist;
return { items: watchlist };
} catch (error) {
return toHttpError(set, error);
}
})
.post('/', async ({ request, set, body }) => {
try {
const user = await requireSessionUser(request);
const ticker = body.ticker.trim().toUpperCase();
.post('/', async ({ body }) => {
const result = await db`
INSERT INTO watchlist ${db(body)}
ON CONFLICT (user_id, ticker) DO NOTHING
RETURNING *
`;
const rows = await db`
INSERT INTO watchlist (
user_id,
ticker,
company_name,
sector
) VALUES (
${user.id},
${ticker},
${body.companyName.trim()},
${body.sector?.trim() || null}
)
ON CONFLICT (user_id, ticker)
DO UPDATE SET
company_name = EXCLUDED.company_name,
sector = EXCLUDED.sector
RETURNING *
`;
return result[0];
return { item: rows[0] };
} catch (error) {
return toHttpError(set, error);
}
}, {
body: t.Object({
user_id: t.String(),
ticker: t.String(),
company_name: t.String(),
sector: t.Optional(t.String())
ticker: t.String({ minLength: 1, maxLength: 12 }),
companyName: t.String({ minLength: 1, maxLength: 200 }),
sector: t.Optional(t.String({ maxLength: 120 }))
})
})
.delete('/:id', async ({ request, set, params }) => {
try {
const user = await requireSessionUser(request);
const rows = await db`
DELETE FROM watchlist
WHERE id = ${params.id}
AND user_id = ${user.id}
RETURNING id
`;
.delete('/:id', async ({ params }) => {
await db`DELETE FROM watchlist WHERE id = ${params.id}`;
return { success: true };
if (!rows[0]) {
set.status = 404;
return { error: 'Watchlist item not found' };
}
return { success: true };
} catch (error) {
return toHttpError(set, error);
}
}, {
params: t.Object({
id: t.Numeric()
})
});

View File

@@ -0,0 +1,61 @@
import { env } from '../config';
type ChatCompletionResponse = {
choices?: Array<{
message?: {
content?: string;
};
}>;
};
export class OpenClawService {
isConfigured() {
return Boolean(env.OPENCLAW_BASE_URL && env.OPENCLAW_API_KEY);
}
async runAnalysis(prompt: string, systemPrompt?: string) {
if (!this.isConfigured()) {
return {
provider: 'local-fallback',
model: env.OPENCLAW_MODEL,
text: 'OpenClaw/ZeroClaw is not configured. Set OPENCLAW_BASE_URL and OPENCLAW_API_KEY to enable live AI analysis.'
};
}
const response = await fetch(`${env.OPENCLAW_BASE_URL}/v1/chat/completions`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${env.OPENCLAW_API_KEY}`
},
body: JSON.stringify({
model: env.OPENCLAW_MODEL,
temperature: 0.2,
messages: [
systemPrompt
? { role: 'system', content: systemPrompt }
: null,
{ role: 'user', content: prompt }
].filter(Boolean)
})
});
if (!response.ok) {
const body = await response.text();
throw new Error(`OpenClaw request failed (${response.status}): ${body.slice(0, 200)}`);
}
const payload = await response.json() as ChatCompletionResponse;
const text = payload.choices?.[0]?.message?.content?.trim();
if (!text) {
throw new Error('OpenClaw returned an empty response');
}
return {
provider: 'openclaw',
model: env.OPENCLAW_MODEL,
text
};
}
}

View File

@@ -1,116 +1,72 @@
import { db } from '../db';
const YAHOO_BASE = 'https://query1.finance.yahoo.com/v8/finance/chart';
export class PriceService {
private baseUrl = 'https://query1.finance.yahoo.com/v8/finance/chart';
async getQuote(ticker: string): Promise<number | null> {
const normalizedTicker = ticker.trim().toUpperCase();
/**
* Get current price for a ticker
*/
async getPrice(ticker: string): Promise<number | null> {
try {
const response = await fetch(
`${this.baseUrl}/${ticker}?interval=1d&range=1d`,
{
headers: {
'User-Agent': 'Mozilla/5.0 (compatible; FiscalClone/1.0)'
}
const response = await fetch(`${YAHOO_BASE}/${normalizedTicker}?interval=1d&range=1d`, {
headers: {
'User-Agent': 'Mozilla/5.0 (compatible; FiscalClone/2.0)'
}
);
});
if (!response.ok) return null;
const data = await response.json();
const result = data.chart?.result?.[0];
if (!result?.meta?.regularMarketPrice) {
if (!response.ok) {
return null;
}
return result.meta.regularMarketPrice;
} catch (error) {
console.error(`Error fetching price for ${ticker}:`, error);
const payload = await response.json() as {
chart?: {
result?: Array<{ meta?: { regularMarketPrice?: number } }>;
};
};
const price = payload.chart?.result?.[0]?.meta?.regularMarketPrice;
return typeof price === 'number' ? price : null;
} catch {
return null;
}
}
/**
* Get historical prices
*/
async getHistoricalPrices(ticker: string, period: string = '1y'): Promise<Array<{ date: string, price: number }>> {
try {
const response = await fetch(
`${this.baseUrl}/${ticker}?interval=1d&range=${period}`,
{
headers: {
'User-Agent': 'Mozilla/5.0 (compatible; FiscalClone/1.0)'
}
}
);
async refreshHoldingsPrices(userId?: number) {
const holdings = userId
? await db`SELECT DISTINCT ticker FROM holdings WHERE user_id = ${userId}`
: await db`SELECT DISTINCT ticker FROM holdings`;
if (!response.ok) return [];
let updatedCount = 0;
const data = await response.json();
const result = data.chart?.result?.[0];
for (const holding of holdings) {
const price = await this.getQuote(holding.ticker);
if (!result?.timestamp || !result?.indicators?.quote?.[0]?.close) {
return [];
if (price === null) {
continue;
}
const timestamps = result.timestamp;
const closes = result.indicators.quote[0].close;
return timestamps.map((ts: number, i: number) => ({
date: new Date(ts * 1000).toISOString(),
price: closes[i]
})).filter((p: any) => p.price !== null);
} catch (error) {
console.error(`Error fetching historical prices for ${ticker}:`, error);
return [];
}
}
/**
* Update all portfolio prices
*/
async updateAllPrices(db: any) {
const holdings = await db`
SELECT DISTINCT ticker FROM portfolio
`;
let updated = 0;
for (const { ticker } of holdings) {
const price = await this.getPrice(ticker);
if (price) {
if (userId) {
await db`
UPDATE portfolio
SET current_price = ${price}
WHERE ticker = ${ticker}
UPDATE holdings
SET current_price = ${price}, last_price_at = NOW(), updated_at = NOW()
WHERE user_id = ${userId} AND ticker = ${holding.ticker}
`;
} else {
await db`
UPDATE holdings
SET current_price = ${price}, last_price_at = NOW(), updated_at = NOW()
WHERE ticker = ${holding.ticker}
`;
updated++;
}
// Rate limiting
await new Promise(resolve => setTimeout(resolve, 100));
updatedCount += 1;
await Bun.sleep(120);
}
console.log(`Updated ${updated} stock prices`);
}
/**
* Get quote for multiple tickers
*/
async getQuotes(tickers: string[]): Promise<Record<string, number>> {
const quotes: Record<string, number> = {};
await Promise.all(
tickers.map(async ticker => {
const price = await this.getPrice(ticker);
if (price) {
quotes[ticker] = price;
}
})
);
return quotes;
return {
updatedCount,
totalTickers: holdings.length
};
}
}

View File

@@ -1,162 +1,208 @@
import { type Filings } from '../db';
import { env } from '../config';
import type { FilingMetrics, FilingType } from '../types';
export class SECScraper {
private baseUrl = 'https://www.sec.gov';
private userAgent = 'Fiscal Clone (contact@example.com)';
type TickerDirectoryRecord = {
cik_str: number;
ticker: string;
title: string;
};
/**
* Search SEC filings by ticker
*/
async searchFilings(ticker: string, count = 20): Promise<Filings[]> {
const cik = await this.getCIK(ticker);
type RecentFilingsPayload = {
filings?: {
recent?: {
accessionNumber?: string[];
filingDate?: string[];
form?: string[];
primaryDocument?: string[];
};
};
cik?: string;
name?: string;
};
const response = await fetch(
`https://data.sec.gov/submissions/CIK${cik.padStart(10, '0')}.json`,
{
headers: {
'User-Agent': this.userAgent
}
type CompanyFactsPayload = {
facts?: {
'us-gaap'?: Record<string, { units?: Record<string, Array<{ val?: number; end?: string; filed?: string }>> }>;
};
};
export type SecFiling = {
ticker: string;
cik: string;
companyName: string;
filingType: FilingType;
filingDate: string;
accessionNumber: string;
filingUrl: string | null;
};
const SUPPORTED_FORMS: FilingType[] = ['10-K', '10-Q', '8-K'];
const TICKER_CACHE_TTL_MS = 1000 * 60 * 60 * 24;
const FACTS_CACHE_TTL_MS = 1000 * 60 * 10;
export class SecService {
private tickerCache: Map<string, TickerDirectoryRecord> = new Map();
private tickerCacheLoadedAt = 0;
private factsCache: Map<string, { loadedAt: number; metrics: FilingMetrics }> = new Map();
private async fetchJson<T>(url: string): Promise<T> {
const response = await fetch(url, {
headers: {
'User-Agent': env.SEC_USER_AGENT,
Accept: 'application/json'
}
);
});
if (!response.ok) {
throw new Error(`SEC API error: ${response.status}`);
throw new Error(`SEC request failed (${response.status}) for ${url}`);
}
const data = await response.json();
const filings = data.filings?.recent || [];
const filteredFilings = filings
.filter((f: any) =>
['10-K', '10-Q', '8-K'].includes(f.form)
)
.slice(0, count)
.map((f: any) => ({
ticker,
filing_type: f.form,
filing_date: new Date(f.filingDate),
accession_number: f.accessionNumber,
cik: data.cik,
company_name: data.name || ticker,
}));
return filteredFilings;
return await response.json() as T;
}
/**
* Check for new filings and save to database
*/
async checkNewFilings(db: any) {
const tickers = await db`
SELECT DISTINCT ticker FROM watchlist
`;
private async ensureTickerCache() {
const isFresh = Date.now() - this.tickerCacheLoadedAt < TICKER_CACHE_TTL_MS;
console.log(`Checking filings for ${tickers.length} tickers...`);
for (const { ticker } of tickers) {
try {
const latest = await db`
SELECT accession_number FROM filings
WHERE ticker = ${ticker}
ORDER BY filing_date DESC
LIMIT 1
`;
const filings = await this.searchFilings(ticker, 10);
const newFilings = filings.filter(
f => !latest.some((l: any) => l.accession_number === f.accession_number)
);
if (newFilings.length > 0) {
console.log(`Found ${newFilings.length} new filings for ${ticker}`);
for (const filing of newFilings) {
const metrics = await this.extractKeyMetrics(filing);
await db`
INSERT INTO filings ${db(filing, metrics)}
ON CONFLICT (accession_number) DO NOTHING
`;
}
}
} catch (error) {
console.error(`Error checking filings for ${ticker}:`, error);
}
}
}
/**
* Get CIK for a ticker
*/
private async getCIK(ticker: string): Promise<string> {
const response = await fetch(
`https://www.sec.gov/files/company_tickers.json`
);
if (!response.ok) {
throw new Error('Failed to get company tickers');
if (isFresh && this.tickerCache.size > 0) {
return;
}
const data = await response.json();
const companies = data.data;
const payload = await this.fetchJson<Record<string, TickerDirectoryRecord>>('https://www.sec.gov/files/company_tickers.json');
const nextCache = new Map<string, TickerDirectoryRecord>();
for (const [cik, company] of Object.entries(companies)) {
if (company.ticker === ticker.toUpperCase()) {
return cik;
}
for (const record of Object.values(payload)) {
nextCache.set(record.ticker.toUpperCase(), record);
}
throw new Error(`Ticker ${ticker} not found`);
this.tickerCache = nextCache;
this.tickerCacheLoadedAt = Date.now();
}
/**
* Extract key metrics from filing
*/
async extractKeyMetrics(filing: any): Promise<any> {
try {
const filingUrl = `${this.baseUrl}/Archives/${filing.accession_number.replace(/-/g, '')}/${filing.accession_number}-index.htm`;
async resolveTicker(ticker: string) {
await this.ensureTickerCache();
const response = await fetch(filingUrl, {
headers: { 'User-Agent': this.userAgent }
});
const normalizedTicker = ticker.trim().toUpperCase();
const record = this.tickerCache.get(normalizedTicker);
if (!response.ok) return null;
const html = await response.text();
// Extract key financial metrics from XBRL
const metrics = {
revenue: this.extractMetric(html, 'Revenues'),
netIncome: this.extractMetric(html, 'NetIncomeLoss'),
totalAssets: this.extractMetric(html, 'Assets'),
cash: this.extractMetric(html, 'CashAndCashEquivalentsAtCarryingValue'),
debt: this.extractMetric(html, 'LongTermDebt')
};
return metrics;
} catch (error) {
console.error('Error extracting metrics:', error);
return null;
if (!record) {
throw new Error(`Ticker ${normalizedTicker} was not found in SEC directory`);
}
}
/**
* Extract a specific metric from XBRL data
*/
private extractMetric(html: string, metricName: string): number | null {
const regex = new RegExp(`<ix:nonFraction[^>]*name="[^"]*${metricName}[^"]*"[^>]*>([^<]+)<`, 'i');
const match = html.match(regex);
return match ? parseFloat(match[1].replace(/,/g, '')) : null;
}
/**
* Get filing details by accession number
*/
async getFilingDetails(accessionNumber: string) {
const filingUrl = `${this.baseUrl}/Archives/${accessionNumber.replace(/-/g, '')}/${accessionNumber}-index.htm`;
return {
filing_url: filingUrl
ticker: normalizedTicker,
cik: String(record.cik_str),
companyName: record.title
};
}
async fetchRecentFilings(ticker: string, limit = 20): Promise<SecFiling[]> {
const company = await this.resolveTicker(ticker);
const cikPadded = company.cik.padStart(10, '0');
const payload = await this.fetchJson<RecentFilingsPayload>(`https://data.sec.gov/submissions/CIK${cikPadded}.json`);
const recent = payload.filings?.recent;
if (!recent) {
return [];
}
const forms = recent.form ?? [];
const accessionNumbers = recent.accessionNumber ?? [];
const filingDates = recent.filingDate ?? [];
const primaryDocuments = recent.primaryDocument ?? [];
const filings: SecFiling[] = [];
for (let i = 0; i < forms.length; i += 1) {
const filingType = forms[i] as FilingType;
if (!SUPPORTED_FORMS.includes(filingType)) {
continue;
}
const accessionNumber = accessionNumbers[i];
if (!accessionNumber) {
continue;
}
const compactAccession = accessionNumber.replace(/-/g, '');
const documentName = primaryDocuments[i];
const filingUrl = documentName
? `https://www.sec.gov/Archives/edgar/data/${Number(company.cik)}/${compactAccession}/${documentName}`
: null;
filings.push({
ticker: company.ticker,
cik: company.cik,
companyName: payload.name ?? company.companyName,
filingType,
filingDate: filingDates[i] ?? new Date().toISOString().slice(0, 10),
accessionNumber,
filingUrl
});
if (filings.length >= limit) {
break;
}
}
return filings;
}
private pickLatestFact(payload: CompanyFactsPayload, tag: string): number | null {
const unitCollections = payload.facts?.['us-gaap']?.[tag]?.units;
if (!unitCollections) {
return null;
}
const preferredUnits = ['USD', 'USD/shares'];
for (const unit of preferredUnits) {
const series = unitCollections[unit];
if (!series?.length) {
continue;
}
const best = [...series]
.filter((item) => typeof item.val === 'number')
.sort((a, b) => {
const aDate = Date.parse(a.filed ?? a.end ?? '1970-01-01');
const bDate = Date.parse(b.filed ?? b.end ?? '1970-01-01');
return bDate - aDate;
})[0];
if (best?.val !== undefined) {
return best.val;
}
}
return null;
}
async fetchMetrics(cik: string): Promise<FilingMetrics> {
const normalized = cik.padStart(10, '0');
const cached = this.factsCache.get(normalized);
if (cached && Date.now() - cached.loadedAt < FACTS_CACHE_TTL_MS) {
return cached.metrics;
}
const payload = await this.fetchJson<CompanyFactsPayload>(`https://data.sec.gov/api/xbrl/companyfacts/CIK${normalized}.json`);
const metrics: FilingMetrics = {
revenue: this.pickLatestFact(payload, 'Revenues'),
netIncome: this.pickLatestFact(payload, 'NetIncomeLoss'),
totalAssets: this.pickLatestFact(payload, 'Assets'),
cash: this.pickLatestFact(payload, 'CashAndCashEquivalentsAtCarryingValue'),
debt: this.pickLatestFact(payload, 'LongTermDebt')
};
this.factsCache.set(normalized, {
loadedAt: Date.now(),
metrics
});
return metrics;
}
}

30
backend/src/session.ts Normal file
View File

@@ -0,0 +1,30 @@
import { auth } from './auth';
import type { SessionUser } from './types';
export class UnauthorizedError extends Error {
constructor(message = 'Authentication required') {
super(message);
this.name = 'UnauthorizedError';
}
}
export async function requireSessionUser(request: Request): Promise<SessionUser> {
const session = await auth.api.getSession({ headers: request.headers });
if (!session?.user?.id) {
throw new UnauthorizedError();
}
const userId = Number(session.user.id);
if (!Number.isFinite(userId)) {
throw new UnauthorizedError('Invalid session user id');
}
return {
id: userId,
email: session.user.email,
name: session.user.name ?? null,
image: session.user.image ?? null
};
}

View File

@@ -0,0 +1,201 @@
import { z } from 'zod';
import { db } from '../db';
import { OpenClawService } from '../services/openclaw';
import { PriceService } from '../services/prices';
import { SecService } from '../services/sec';
import type { LongTaskRecord, TaskType } from '../types';
const secService = new SecService();
const priceService = new PriceService();
const openClawService = new OpenClawService();
const syncFilingsPayload = z.object({
ticker: z.string().min(1),
limit: z.number().int().positive().max(50).default(20)
});
const refreshPricesPayload = z.object({
userId: z.number().int().positive().optional()
});
const analyzeFilingPayload = z.object({
accessionNumber: z.string().min(8)
});
const portfolioInsightsPayload = z.object({
userId: z.number().int().positive()
});
async function processSyncFilings(task: LongTaskRecord) {
const { ticker, limit } = syncFilingsPayload.parse(task.payload);
const filings = await secService.fetchRecentFilings(ticker, limit);
const metrics = filings.length > 0
? await secService.fetchMetrics(filings[0].cik)
: null;
let touched = 0;
for (const filing of filings) {
await db`
INSERT INTO filings (
ticker,
filing_type,
filing_date,
accession_number,
cik,
company_name,
filing_url,
metrics,
updated_at
) VALUES (
${filing.ticker},
${filing.filingType},
${filing.filingDate},
${filing.accessionNumber},
${filing.cik},
${filing.companyName},
${filing.filingUrl},
${metrics},
NOW()
)
ON CONFLICT (accession_number)
DO UPDATE SET
filing_type = EXCLUDED.filing_type,
filing_date = EXCLUDED.filing_date,
filing_url = EXCLUDED.filing_url,
metrics = COALESCE(EXCLUDED.metrics, filings.metrics),
updated_at = NOW()
`;
touched += 1;
}
return {
ticker: ticker.toUpperCase(),
filingsFetched: filings.length,
recordsUpserted: touched,
metrics
};
}
async function processRefreshPrices(task: LongTaskRecord) {
const { userId } = refreshPricesPayload.parse(task.payload);
const result = await priceService.refreshHoldingsPrices(userId);
return {
scope: userId ? `user:${userId}` : 'global',
...result
};
}
async function processAnalyzeFiling(task: LongTaskRecord) {
const { accessionNumber } = analyzeFilingPayload.parse(task.payload);
const rows = await db`
SELECT *
FROM filings
WHERE accession_number = ${accessionNumber}
LIMIT 1
`;
const filing = rows[0];
if (!filing) {
throw new Error(`Filing ${accessionNumber} was not found`);
}
const prompt = [
'You are a fiscal research assistant focused on regulatory signals.',
`Analyze this SEC filing from ${filing.company_name} (${filing.ticker}).`,
`Form: ${filing.filing_type}`,
`Filed: ${filing.filing_date}`,
`Metrics JSON: ${JSON.stringify(filing.metrics ?? {})}`,
'Return concise sections: Thesis, Red Flags, Follow-up Questions, Portfolio Impact.'
].join('\n');
const analysis = await openClawService.runAnalysis(prompt, 'Use concise institutional analyst language.');
await db`
UPDATE filings
SET analysis = ${analysis},
updated_at = NOW()
WHERE accession_number = ${accessionNumber}
`;
return {
accessionNumber,
analysis
};
}
async function processPortfolioInsights(task: LongTaskRecord) {
const { userId } = portfolioInsightsPayload.parse(task.payload);
const holdings = await db`
SELECT
ticker,
shares,
avg_cost,
current_price,
market_value,
gain_loss,
gain_loss_pct
FROM holdings
WHERE user_id = ${userId}
ORDER BY market_value DESC
`;
const summaryRows = await db`
SELECT
COUNT(*)::int AS positions,
COALESCE(SUM(market_value), 0)::numeric AS total_value,
COALESCE(SUM(gain_loss), 0)::numeric AS total_gain_loss,
COALESCE(AVG(gain_loss_pct), 0)::numeric AS avg_return_pct
FROM holdings
WHERE user_id = ${userId}
`;
const summary = summaryRows[0] ?? {
positions: 0,
total_value: 0,
total_gain_loss: 0,
avg_return_pct: 0
};
const prompt = [
'Generate portfolio intelligence with actionable recommendations.',
`Portfolio summary: ${JSON.stringify(summary)}`,
`Holdings: ${JSON.stringify(holdings)}`,
'Respond with: 1) Portfolio health score (0-100), 2) top 3 risks, 3) top 3 opportunities, 4) next actions in 7 days.'
].join('\n');
const insight = await openClawService.runAnalysis(prompt, 'Act as a risk-aware buy-side analyst.');
await db`
INSERT INTO portfolio_insights (user_id, model, provider, content)
VALUES (${userId}, ${insight.model}, ${insight.provider}, ${insight.text})
`;
return {
userId,
summary,
insight
};
}
const processors: Record<TaskType, (task: LongTaskRecord) => Promise<Record<string, unknown>>> = {
sync_filings: processSyncFilings,
refresh_prices: processRefreshPrices,
analyze_filing: processAnalyzeFiling,
portfolio_insights: processPortfolioInsights
};
export async function processTask(task: LongTaskRecord) {
const processor = processors[task.task_type];
if (!processor) {
throw new Error(`No processor registered for task ${task.task_type}`);
}
return await processor(task);
}

View File

@@ -0,0 +1,168 @@
import { db } from '../db';
import { env } from '../config';
import type { LongTaskRecord, TaskType } from '../types';
type EnqueueTaskInput = {
taskType: TaskType;
payload: Record<string, unknown>;
createdBy?: number;
priority?: number;
scheduledAt?: Date;
maxAttempts?: number;
};
export async function enqueueTask(input: EnqueueTaskInput) {
const task = await db<LongTaskRecord[]>`
INSERT INTO long_tasks (
task_type,
status,
priority,
payload,
max_attempts,
scheduled_at,
created_by
) VALUES (
${input.taskType},
'queued',
${input.priority ?? 50},
${input.payload},
${input.maxAttempts ?? env.TASK_MAX_ATTEMPTS},
${input.scheduledAt ?? new Date()},
${input.createdBy ?? null}
)
RETURNING *
`;
return task[0];
}
export async function getTaskById(taskId: string, userId?: number) {
const rows = userId
? await db<LongTaskRecord[]>`
SELECT *
FROM long_tasks
WHERE id = ${taskId}
AND (created_by IS NULL OR created_by = ${userId})
LIMIT 1
`
: await db<LongTaskRecord[]>`
SELECT *
FROM long_tasks
WHERE id = ${taskId}
LIMIT 1
`;
return rows[0] ?? null;
}
export async function listRecentTasks(userId: number, limit = 20) {
return await db<LongTaskRecord[]>`
SELECT *
FROM long_tasks
WHERE created_by = ${userId}
ORDER BY created_at DESC
LIMIT ${limit}
`;
}
export async function claimNextTask() {
const staleSeconds = env.TASK_STALE_SECONDS;
return await db.begin(async (tx) => {
await tx`
UPDATE long_tasks
SET status = 'queued',
heartbeat_at = NULL,
started_at = NULL,
updated_at = NOW(),
error = COALESCE(error, 'Task lease expired and was re-queued')
WHERE status = 'running'
AND heartbeat_at IS NOT NULL
AND heartbeat_at < NOW() - (${staleSeconds}::text || ' seconds')::interval
AND attempts < max_attempts
`;
await tx`
UPDATE long_tasks
SET status = 'failed',
finished_at = NOW(),
updated_at = NOW(),
error = COALESCE(error, 'Task lease expired and max attempts reached')
WHERE status = 'running'
AND heartbeat_at IS NOT NULL
AND heartbeat_at < NOW() - (${staleSeconds}::text || ' seconds')::interval
AND attempts >= max_attempts
`;
const rows = await tx<LongTaskRecord[]>`
WITH candidate AS (
SELECT id
FROM long_tasks
WHERE status = 'queued'
AND scheduled_at <= NOW()
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE long_tasks t
SET status = 'running',
started_at = COALESCE(t.started_at, NOW()),
heartbeat_at = NOW(),
attempts = t.attempts + 1,
updated_at = NOW()
FROM candidate
WHERE t.id = candidate.id
RETURNING t.*
`;
return rows[0] ?? null;
});
}
export async function heartbeatTask(taskId: string) {
await db`
UPDATE long_tasks
SET heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = ${taskId}
AND status = 'running'
`;
}
export async function completeTask(taskId: string, result: Record<string, unknown>) {
await db`
UPDATE long_tasks
SET status = 'completed',
result = ${result},
error = NULL,
finished_at = NOW(),
heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = ${taskId}
`;
}
export async function failTask(task: LongTaskRecord, reason: string, retryDelaySeconds = 20) {
const canRetry = task.attempts < task.max_attempts;
if (canRetry) {
await db`
UPDATE long_tasks
SET status = 'queued',
error = ${reason},
scheduled_at = NOW() + (${retryDelaySeconds}::text || ' seconds')::interval,
updated_at = NOW()
WHERE id = ${task.id}
`;
return;
}
await db`
UPDATE long_tasks
SET status = 'failed',
error = ${reason},
finished_at = NOW(),
updated_at = NOW()
WHERE id = ${task.id}
`;
}

View File

@@ -0,0 +1,52 @@
import { env } from '../config';
import { claimNextTask, completeTask, failTask, heartbeatTask } from './repository';
import { processTask } from './processors';
let keepRunning = true;
export function stopWorkerLoop() {
keepRunning = false;
}
function normalizeError(error: unknown) {
if (error instanceof Error) {
return `${error.name}: ${error.message}`;
}
return String(error);
}
export async function runWorkerLoop() {
console.log('[worker] started');
while (keepRunning) {
const task = await claimNextTask();
if (!task) {
await Bun.sleep(700);
continue;
}
console.log(`[worker] claimed task ${task.id} (${task.task_type})`);
const heartbeatTimer = setInterval(() => {
void heartbeatTask(task.id).catch((error) => {
console.error(`[worker] heartbeat failed for ${task.id}`, error);
});
}, env.TASK_HEARTBEAT_SECONDS * 1000);
try {
const result = await processTask(task);
await completeTask(task.id, result);
console.log(`[worker] completed task ${task.id}`);
} catch (error) {
const normalized = normalizeError(error);
console.error(`[worker] failed task ${task.id}`, normalized);
await failTask(task, normalized);
} finally {
clearInterval(heartbeatTimer);
}
}
console.log('[worker] stopping');
}

78
backend/src/types.ts Normal file
View File

@@ -0,0 +1,78 @@
export type FilingType = '10-K' | '10-Q' | '8-K';
export type FilingMetrics = {
revenue: number | null;
netIncome: number | null;
totalAssets: number | null;
cash: number | null;
debt: number | null;
};
export type FilingRecord = {
id: number;
ticker: string;
filing_type: FilingType;
filing_date: string;
accession_number: string;
cik: string;
company_name: string;
filing_url: string | null;
metrics: FilingMetrics | null;
analysis: Record<string, unknown> | null;
created_at: string;
updated_at: string;
};
export type HoldingRecord = {
id: number;
user_id: number;
ticker: string;
shares: string;
avg_cost: string;
current_price: string | null;
market_value: string;
gain_loss: string;
gain_loss_pct: string;
last_price_at: string | null;
created_at: string;
updated_at: string;
};
export type WatchlistRecord = {
id: number;
user_id: number;
ticker: string;
company_name: string;
sector: string | null;
created_at: string;
};
export type TaskType = 'sync_filings' | 'refresh_prices' | 'analyze_filing' | 'portfolio_insights';
export type TaskStatus = 'queued' | 'running' | 'completed' | 'failed';
export type LongTaskRecord = {
id: string;
task_type: TaskType;
status: TaskStatus;
priority: number;
payload: Record<string, unknown>;
result: Record<string, unknown> | null;
error: string | null;
attempts: number;
max_attempts: number;
scheduled_at: string;
started_at: string | null;
heartbeat_at: string | null;
finished_at: string | null;
created_by: number | null;
created_at: string;
updated_at: string;
};
export type SessionUser = {
id: number;
email: string;
name: string | null;
image: string | null;
};

19
backend/src/worker.ts Normal file
View File

@@ -0,0 +1,19 @@
import { runWorkerLoop, stopWorkerLoop } from './tasks/worker-loop';
import { closeDb } from './db';
const shutdown = async (signal: string) => {
console.log(`[worker] received ${signal}`);
stopWorkerLoop();
await Bun.sleep(250);
await closeDb();
process.exit(0);
};
process.on('SIGINT', () => void shutdown('SIGINT'));
process.on('SIGTERM', () => void shutdown('SIGTERM'));
runWorkerLoop().catch(async (error) => {
console.error('[worker] fatal error', error);
await closeDb();
process.exit(1);
});