From 3a442f81f45a74115d75c6e2480e21301cf1d9e1 Mon Sep 17 00:00:00 2001 From: francy51 Date: Thu, 9 Apr 2026 17:43:55 -0400 Subject: [PATCH] Codex ended here and GLM took over wish me luck --- MosaicIQ/src-tauri/src/news/repository.rs | 46 ++- MosaicIQ/src-tauri/src/research/pipeline.rs | 13 +- MosaicIQ/src-tauri/src/research/repository.rs | 207 ++++++++++++-- MosaicIQ/src-tauri/src/research/service.rs | 146 ++++++---- .../components/Research/ResearchInspector.tsx | 29 +- .../src/components/Research/ResearchMode.tsx | 55 +++- MosaicIQ/src/hooks/useResearchProjection.ts | 57 +++- MosaicIQ/src/lib/researchProjection.test.ts | 133 +++++++++ MosaicIQ/src/lib/researchProjection.ts | 266 ++++++++++++++++++ MosaicIQ/vite.config.ts | 41 +++ 10 files changed, 853 insertions(+), 140 deletions(-) create mode 100644 MosaicIQ/src/lib/researchProjection.test.ts create mode 100644 MosaicIQ/src/lib/researchProjection.ts diff --git a/MosaicIQ/src-tauri/src/news/repository.rs b/MosaicIQ/src-tauri/src/news/repository.rs index 6865091..006dafd 100644 --- a/MosaicIQ/src-tauri/src/news/repository.rs +++ b/MosaicIQ/src-tauri/src/news/repository.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use rusqlite::types::Value; use rusqlite::{params, params_from_iter, Connection, OptionalExtension}; @@ -14,6 +15,7 @@ use crate::news::{NewsError, Result}; #[derive(Clone)] pub struct NewsRepository { db_path: PathBuf, + connections: Arc>>, } impl NewsRepository { @@ -22,9 +24,13 @@ impl NewsRepository { std::fs::create_dir_all(parent)?; } - let repository = Self { db_path }; - let connection = repository.open_connection()?; + let connection = open_connection(&db_path)?; + let repository = Self { + db_path, + connections: Arc::new(Mutex::new(Vec::new())), + }; repository.initialize_schema(&connection)?; + repository.store_connection(connection)?; Ok(repository) } @@ -325,13 +331,24 @@ impl NewsRepository { T: Send + 'static, { let db_path = self.db_path.clone(); + let connections = self.connections.clone(); tokio::task::spawn_blocking(move || { - let mut connection = open_connection(&db_path)?; - task(&mut connection) + let mut connection = take_connection(&connections, &db_path)?; + let result = task(&mut connection); + let store_result = store_connection(&connections, connection); + match (result, store_result) { + (Ok(value), Ok(())) => Ok(value), + (Err(error), Ok(())) => Err(error), + (Ok(_), Err(error)) | (Err(_), Err(error)) => Err(error), + } }) .await .map_err(|error| NewsError::Join(error.to_string()))? } + + fn store_connection(&self, connection: Connection) -> Result<()> { + store_connection(&self.connections, connection) + } } fn sync_sources_in_connection( @@ -388,6 +405,27 @@ fn open_connection(path: &Path) -> Result { Ok(connection) } +fn take_connection(pool: &Mutex>, db_path: &Path) -> Result { + { + let mut guard = pool.lock().map_err(|error| { + NewsError::Config(format!("news connection pool poisoned: {error}")) + })?; + if let Some(connection) = guard.pop() { + return Ok(connection); + } + } + + open_connection(db_path) +} + +fn store_connection(pool: &Mutex>, connection: Connection) -> Result<()> { + let mut guard = pool + .lock() + .map_err(|error| NewsError::Config(format!("news connection pool poisoned: {error}")))?; + guard.push(connection); + Ok(()) +} + fn query_articles( connection: &mut Connection, request: QueryNewsFeedRequest, diff --git a/MosaicIQ/src-tauri/src/research/pipeline.rs b/MosaicIQ/src-tauri/src/research/pipeline.rs index 84e4898..d731d91 100644 --- a/MosaicIQ/src-tauri/src/research/pipeline.rs +++ b/MosaicIQ/src-tauri/src/research/pipeline.rs @@ -66,13 +66,6 @@ impl ResearchPipeline { self.repository.enqueue_jobs(jobs).await } - pub async fn mark_running(&self, mut job: PipelineJob) -> Result { - job.status = JobStatus::Running; - job.attempt_count += 1; - job.updated_at = now_rfc3339(); - self.repository.save_job(job).await - } - pub async fn mark_completed(&self, mut job: PipelineJob) -> Result { job.status = JobStatus::Completed; job.last_error = None; @@ -92,13 +85,13 @@ impl ResearchPipeline { pub async fn mark_failed(&self, mut job: PipelineJob, error: &str) -> Result { job.status = JobStatus::Failed; job.last_error = Some(error.to_string()); - job.next_attempt_at = Some(next_retry_timestamp(job.attempt_count + 1)); + job.next_attempt_at = Some(next_retry_timestamp(job.attempt_count)); job.updated_at = now_rfc3339(); self.repository.save_job(job).await } - pub async fn due_jobs(&self, limit: usize) -> Result> { - self.repository.list_due_jobs(limit).await + pub async fn claim_due_jobs(&self, limit: usize) -> Result> { + self.repository.claim_due_jobs(limit).await } } diff --git a/MosaicIQ/src-tauri/src/research/repository.rs b/MosaicIQ/src-tauri/src/research/repository.rs index 2a12b7c..1fab558 100644 --- a/MosaicIQ/src-tauri/src/research/repository.rs +++ b/MosaicIQ/src-tauri/src/research/repository.rs @@ -1,8 +1,11 @@ //! SQLite-backed persistence for research workspaces, notes, links, ghosts, and jobs. +use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; -use rusqlite::{params, Connection, OptionalExtension}; +use rusqlite::types::Value as SqlValue; +use rusqlite::{params, params_from_iter, Connection, OptionalExtension}; use crate::research::errors::{ResearchError, Result}; use crate::research::types::{ @@ -13,6 +16,7 @@ use crate::research::types::{ #[derive(Clone)] pub struct ResearchRepository { db_path: PathBuf, + connections: Arc>>, } impl ResearchRepository { @@ -21,9 +25,13 @@ impl ResearchRepository { std::fs::create_dir_all(parent)?; } - let repository = Self { db_path }; - let connection = repository.open_connection()?; + let connection = open_connection(&db_path)?; + let repository = Self { + db_path, + connections: Arc::new(Mutex::new(Vec::new())), + }; repository.initialize_schema(&connection)?; + repository.store_connection(connection)?; Ok(repository) } @@ -177,6 +185,64 @@ impl ResearchRepository { .await } + pub async fn save_notes_batch(&self, notes: Vec) -> Result<()> { + self.with_connection(move |connection| { + let transaction = connection.transaction()?; + + for note in ¬es { + transaction.execute( + "INSERT INTO research_notes ( + id, workspace_id, note_type, ticker, source_id, archived, pinned, revision, + evidence_status, created_at, updated_at, entity_json + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) + ON CONFLICT(id) DO UPDATE SET + workspace_id = excluded.workspace_id, + note_type = excluded.note_type, + ticker = excluded.ticker, + source_id = excluded.source_id, + archived = excluded.archived, + pinned = excluded.pinned, + revision = excluded.revision, + evidence_status = excluded.evidence_status, + updated_at = excluded.updated_at, + entity_json = excluded.entity_json", + params![ + ¬e.id, + ¬e.workspace_id, + serde_json::to_string(¬e.note_type)?, + note.ticker.as_deref(), + note.source_id.as_deref(), + i64::from(note.archived), + i64::from(note.pinned), + i64::from(note.revision), + serde_json::to_string(¬e.evidence_status)?, + ¬e.created_at, + ¬e.updated_at, + serde_json::to_string(note)?, + ], + )?; + transaction.execute( + "DELETE FROM research_fts WHERE note_id = ?1", + params![¬e.id], + )?; + transaction.execute( + "INSERT INTO research_fts (note_id, title, cleaned_text, ai_annotation) + VALUES (?1, ?2, ?3, ?4)", + params![ + ¬e.id, + note.title.clone().unwrap_or_default(), + ¬e.cleaned_text, + note.ai_annotation.clone().unwrap_or_default(), + ], + )?; + } + + transaction.commit()?; + Ok(()) + }) + .await + } + pub async fn get_note(&self, note_id: &str) -> Result { let note_id = note_id.to_string(); self.with_connection(move |connection| { @@ -319,20 +385,37 @@ impl ResearchRepository { return Ok(Vec::new()); } - let ids = source_ids.to_vec(); + let mut seen = HashSet::new(); + let ids = source_ids + .iter() + .filter(|source_id| seen.insert((*source_id).clone())) + .cloned() + .collect::>(); self.with_connection(move |connection| { - let mut results = Vec::new(); - let mut statement = - connection.prepare("SELECT entity_json FROM source_records WHERE id = ?1")?; - for source_id in ids { - if let Some(json) = statement - .query_row(params![source_id], |row| row.get::<_, String>(0)) - .optional()? - { - results.push(serde_json::from_str(&json)?); - } - } - Ok(results) + let placeholders = std::iter::repeat_n("?", ids.len()) + .collect::>() + .join(", "); + let query = + format!("SELECT id, entity_json FROM source_records WHERE id IN ({placeholders})"); + let query_params = ids.iter().cloned().map(SqlValue::Text).collect::>(); + let mut statement = connection.prepare(&query)?; + let rows = statement.query_map(params_from_iter(query_params.iter()), |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) + })?; + let rows = rows.collect::, _>>()?; + let sources_by_id = rows + .into_iter() + .map(|(id, json)| { + serde_json::from_str::(&json) + .map(|source| (id, source)) + .map_err(ResearchError::from) + }) + .collect::>>()?; + + Ok(ids + .into_iter() + .filter_map(|source_id| sources_by_id.get(&source_id).cloned()) + .collect()) }) .await } @@ -621,29 +704,57 @@ impl ResearchRepository { Ok(value) } - pub async fn list_due_jobs(&self, limit: usize) -> Result> { + pub async fn claim_due_jobs(&self, limit: usize) -> Result> { + let now = crate::research::util::now_rfc3339(); self.with_connection(move |connection| { - let mut statement = connection.prepare( + let transaction = connection.transaction()?; + let mut statement = transaction.prepare( "SELECT entity_json FROM pipeline_jobs WHERE status IN (?1, ?2) - AND (next_attempt_at IS NULL OR next_attempt_at <= datetime('now')) + AND (next_attempt_at IS NULL OR next_attempt_at <= ?3) ORDER BY updated_at ASC - LIMIT ?3", + LIMIT ?4", )?; let rows = statement.query_map( params![ serde_json::to_string(&JobStatus::Queued)?, serde_json::to_string(&JobStatus::Failed)?, + now.clone(), i64::try_from(limit) .map_err(|error| ResearchError::Validation(error.to_string()))?, ], |row| row.get::<_, String>(0), )?; - rows.collect::, _>>()? + let mut jobs = rows + .collect::, _>>()? .into_iter() - .map(|json| serde_json::from_str(&json).map_err(ResearchError::from)) - .collect() + .map(|json| serde_json::from_str::(&json).map_err(ResearchError::from)) + .collect::>>()?; + + drop(statement); + + for job in &mut jobs { + job.status = JobStatus::Running; + job.attempt_count += 1; + job.updated_at = now.clone(); + transaction.execute( + "UPDATE pipeline_jobs + SET status = ?2, + updated_at = ?3, + entity_json = ?4 + WHERE id = ?1", + params![ + &job.id, + serde_json::to_string(&job.status)?, + &job.updated_at, + serde_json::to_string(job)?, + ], + )?; + } + + transaction.commit()?; + Ok(jobs) }) .await } @@ -759,10 +870,6 @@ impl ResearchRepository { .await } - fn open_connection(&self) -> Result { - open_connection(&self.db_path) - } - fn initialize_schema(&self, connection: &Connection) -> Result<()> { connection.execute_batch( "PRAGMA foreign_keys = ON; @@ -799,6 +906,8 @@ impl ResearchRepository { ON research_notes (workspace_id, note_type, archived); CREATE INDEX IF NOT EXISTS research_notes_workspace_ticker_updated_idx ON research_notes (workspace_id, ticker, updated_at DESC); + CREATE INDEX IF NOT EXISTS research_notes_workspace_source_type_idx + ON research_notes (workspace_id, source_id, note_type); CREATE TABLE IF NOT EXISTS note_links ( id TEXT PRIMARY KEY, workspace_id TEXT NOT NULL REFERENCES research_workspaces(id) ON DELETE CASCADE, @@ -831,6 +940,10 @@ impl ResearchRepository { ); CREATE INDEX IF NOT EXISTS source_records_workspace_ticker_kind_published_idx ON source_records (workspace_id, ticker, kind, published_at DESC); + CREATE INDEX IF NOT EXISTS source_records_workspace_checksum_idx + ON source_records (workspace_id, json_extract(entity_json, '$.checksum')); + CREATE INDEX IF NOT EXISTS source_records_workspace_accession_idx + ON source_records (workspace_id, json_extract(entity_json, '$.filingAccession')); CREATE TABLE IF NOT EXISTS source_excerpts ( id TEXT PRIMARY KEY, source_id TEXT NOT NULL, @@ -865,6 +978,10 @@ impl ResearchRepository { created_at TEXT NOT NULL, entity_json TEXT NOT NULL ); + CREATE INDEX IF NOT EXISTS audit_events_entity_created_idx + ON audit_events (entity_id, created_at ASC); + CREATE INDEX IF NOT EXISTS audit_events_workspace_created_idx + ON audit_events (workspace_id, created_at ASC); CREATE VIRTUAL TABLE IF NOT EXISTS research_fts USING fts5(note_id UNINDEXED, title, cleaned_text, ai_annotation);", )?; Ok(()) @@ -876,13 +993,24 @@ impl ResearchRepository { T: Send + 'static, { let db_path = self.db_path.clone(); + let connections = self.connections.clone(); tokio::task::spawn_blocking(move || { - let mut connection = open_connection(&db_path)?; - task(&mut connection) + let mut connection = take_connection(&connections, &db_path)?; + let result = task(&mut connection); + let store_result = store_connection(&connections, connection); + match (result, store_result) { + (Ok(value), Ok(())) => Ok(value), + (Err(error), Ok(())) => Err(error), + (Ok(_), Err(error)) | (Err(_), Err(error)) => Err(error), + } }) .await .map_err(|error| ResearchError::Join(error.to_string()))? } + + fn store_connection(&self, connection: Connection) -> Result<()> { + store_connection(&self.connections, connection) + } } fn open_connection(path: &Path) -> Result { @@ -895,6 +1023,27 @@ fn open_connection(path: &Path) -> Result { Ok(connection) } +fn take_connection(pool: &Mutex>, db_path: &Path) -> Result { + { + let mut guard = pool.lock().map_err(|error| { + ResearchError::Validation(format!("research connection pool poisoned: {error}")) + })?; + if let Some(connection) = guard.pop() { + return Ok(connection); + } + } + + open_connection(db_path) +} + +fn store_connection(pool: &Mutex>, connection: Connection) -> Result<()> { + let mut guard = pool.lock().map_err(|error| { + ResearchError::Validation(format!("research connection pool poisoned: {error}")) + })?; + guard.push(connection); + Ok(()) +} + #[cfg(test)] mod tests { use tempfile::tempdir; diff --git a/MosaicIQ/src-tauri/src/research/service.rs b/MosaicIQ/src-tauri/src/research/service.rs index c0fd2bc..60b22b9 100644 --- a/MosaicIQ/src-tauri/src/research/service.rs +++ b/MosaicIQ/src-tauri/src/research/service.rs @@ -1,5 +1,6 @@ //! Public orchestration service for the research subsystem. +use std::collections::HashSet; use std::sync::Arc; use serde_json::Value; @@ -38,6 +39,7 @@ pub struct ResearchService { ai_gateway: Arc, emitter: ResearchEventEmitter, settings: AgentSettingsService, + job_processor_lock: Arc>, } impl Clone for ResearchService { @@ -48,6 +50,7 @@ impl Clone for ResearchService { ai_gateway: self.ai_gateway.clone(), emitter: self.emitter.clone(), settings: self.settings.clone(), + job_processor_lock: self.job_processor_lock.clone(), } } } @@ -73,6 +76,7 @@ impl ResearchService { ai_gateway, emitter: ResearchEventEmitter::new(app_handle), settings: AgentSettingsService::new(app_handle), + job_processor_lock: Arc::new(tokio::sync::Mutex::new(())), }) } @@ -207,6 +211,7 @@ impl ResearchService { source_note.source_id.iter().cloned().collect(), ) .await?; + self.emitter.note_updated(&source_note); } } @@ -428,22 +433,17 @@ impl ResearchService { &self, request: GetWorkspaceProjectionRequest, ) -> Result { - let workspace = self.repository.get_workspace(&request.workspace_id).await?; - let notes = self - .repository - .list_notes(&request.workspace_id, false, None) - .await?; - let links = self - .repository - .list_links(&request.workspace_id, None) - .await?; - let ghosts = self - .repository - .list_ghosts(&request.workspace_id, false) - .await?; + let (workspace, notes, links, ghosts) = tokio::try_join!( + self.repository.get_workspace(&request.workspace_id), + self.repository + .list_notes(&request.workspace_id, false, None), + self.repository.list_links(&request.workspace_id, None), + self.repository.list_ghosts(&request.workspace_id, false), + )?; + let active_view = request.view.unwrap_or(workspace.default_view); Ok(build_workspace_projection( - workspace.clone(), - request.view.unwrap_or(workspace.default_view), + workspace, + active_view, notes, links, ghosts, @@ -559,27 +559,27 @@ impl ResearchService { request: GetNoteAuditTrailRequest, ) -> Result { let note = self.repository.get_note(&request.note_id).await?; - let links = self - .repository - .list_links(¬e.workspace_id, Some(¬e.id)) - .await?; - let ghosts = self - .repository - .list_ghosts(¬e.workspace_id, true) - .await? + let (links, ghosts, audit_events) = tokio::try_join!( + self.repository + .list_links(¬e.workspace_id, Some(¬e.id)), + self.repository.list_ghosts(¬e.workspace_id, true), + self.repository.list_audit_events_for_entity(¬e.id), + )?; + let ghosts = ghosts .into_iter() .filter(|ghost| { ghost.supporting_note_ids.contains(¬e.id) || ghost.contradicting_note_ids.contains(¬e.id) }) .collect::>(); - let mut source_ids = note.source_id.iter().cloned().collect::>(); - source_ids.extend(ghosts.iter().flat_map(|ghost| ghost.source_ids.clone())); + let source_ids = dedupe_ids( + note.source_id.iter().cloned().chain( + ghosts + .iter() + .flat_map(|ghost| ghost.source_ids.iter().cloned()), + ), + ); let sources = self.repository.list_sources_by_ids(&source_ids).await?; - let audit_events = self - .repository - .list_audit_events_for_entity(¬e.id) - .await?; let memo_blocks = build_memo_blocks(std::slice::from_ref(¬e), &ghosts); Ok(NoteAuditTrail { @@ -619,30 +619,40 @@ impl ResearchService { } pub async fn process_due_jobs(&self) -> Result<()> { - let jobs = self.pipeline.due_jobs(16).await?; - for job in jobs { - let running = self.pipeline.mark_running(job).await?; - self.emitter.job_updated(&running); - let result = self.process_job(&running).await; - match result { - Ok(()) => { - let completed = self.pipeline.mark_completed(running).await?; - self.emitter.job_updated(&completed); - } - Err(error) => { - let failed = if running.attempt_count >= running.max_attempts { - self.pipeline - .mark_skipped(running, &error.to_string()) - .await? - } else { - self.pipeline - .mark_failed(running, &error.to_string()) - .await? - }; - self.emitter.job_updated(&failed); + let Ok(_guard) = self.job_processor_lock.try_lock() else { + return Ok(()); + }; + + loop { + let jobs = self.pipeline.claim_due_jobs(16).await?; + if jobs.is_empty() { + break; + } + + for running in jobs { + self.emitter.job_updated(&running); + let result = self.process_job(&running).await; + match result { + Ok(()) => { + let completed = self.pipeline.mark_completed(running).await?; + self.emitter.job_updated(&completed); + } + Err(error) => { + let failed = if running.attempt_count >= running.max_attempts { + self.pipeline + .mark_skipped(running, &error.to_string()) + .await? + } else { + self.pipeline + .mark_failed(running, &error.to_string()) + .await? + }; + self.emitter.job_updated(&failed); + } } } } + Ok(()) } @@ -733,20 +743,30 @@ impl ResearchService { .replace_links_for_workspace(&workspace_id, links.clone()) .await?; let now = now_rfc3339(); - let mut note_map = notes + let changed_notes = notes .into_iter() - .map(|mut note| { - note.inferred_links = saved_links + .filter_map(|mut note| { + let inferred_links = saved_links .iter() .filter(|link| link.from_note_id == note.id || link.to_note_id == note.id) .map(|link| link.id.clone()) - .collect(); + .collect::>(); + if note.inferred_links == inferred_links { + return None; + } + + note.inferred_links = inferred_links; note.last_linked_at = Some(now.clone()); - (note.id.clone(), note) + Some(note) }) - .collect::>(); - for note in note_map.values_mut() { - self.repository.save_note(note.clone()).await?; + .collect::>(); + if !changed_notes.is_empty() { + self.repository + .save_notes_batch(changed_notes.clone()) + .await?; + for note in &changed_notes { + self.emitter.note_updated(note); + } } for link in &saved_links { self.record_audit( @@ -882,6 +902,16 @@ impl ResearchService { } } +fn dedupe_ids(ids: I) -> Vec +where + I: IntoIterator, +{ + let mut seen = HashSet::new(); + ids.into_iter() + .filter(|id| seen.insert(id.clone())) + .collect() +} + fn payload_required_str(payload: &Value, key: &str) -> Result { payload .get(key) diff --git a/MosaicIQ/src/components/Research/ResearchInspector.tsx b/MosaicIQ/src/components/Research/ResearchInspector.tsx index b7e92b4..dc10ad9 100644 --- a/MosaicIQ/src/components/Research/ResearchInspector.tsx +++ b/MosaicIQ/src/components/Research/ResearchInspector.tsx @@ -1,6 +1,5 @@ import React, { useEffect, useState } from 'react'; import { Pin, Archive, RefreshCw, Sparkles } from 'lucide-react'; -import { researchBridge } from '../../lib/researchBridge'; import type { GhostNote, NoteAuditTrail, @@ -14,6 +13,9 @@ import { GHOST_CLASS_LABELS, NOTE_TYPE_LABELS } from './primitives/researchMeta' interface ResearchInspectorProps { note: ResearchNote | null; ghost: GhostNote | null; + auditTrail: NoteAuditTrail | null; + isLoadingAuditTrail: boolean; + onRefreshAuditTrail: () => void; onUpdateNote: (noteId: string, patch: { rawText?: string; title?: string; @@ -44,6 +46,9 @@ const editableNoteTypes: NoteType[] = [ export const ResearchInspector: React.FC = ({ note, ghost, + auditTrail, + isLoadingAuditTrail, + onRefreshAuditTrail, onUpdateNote, onArchiveNote, onPromoteNote, @@ -52,24 +57,15 @@ export const ResearchInspector: React.FC = ({ const [draftTitle, setDraftTitle] = useState(''); const [draftBody, setDraftBody] = useState(''); const [draftType, setDraftType] = useState('claim'); - const [auditTrail, setAuditTrail] = useState(null); - const [isLoadingAudit, setIsLoadingAudit] = useState(false); useEffect(() => { if (!note) { - setAuditTrail(null); return; } setDraftTitle(note.title ?? ''); setDraftBody(note.rawText); setDraftType(note.noteType); - setIsLoadingAudit(true); - void researchBridge.getNoteAuditTrail({ noteId: note.id }).then((trail) => { - setAuditTrail(trail); - }).finally(() => { - setIsLoadingAudit(false); - }); }, [note]); if (!note && !ghost) { @@ -227,21 +223,12 @@ export const ResearchInspector: React.FC = ({
Audit Trail
- {isLoadingAudit ? 'Loading evidence trace...' : `${auditTrail?.links.length ?? 0} links, ${auditTrail?.sources.length ?? 0} sources`} + {isLoadingAuditTrail ? 'Loading evidence trace...' : `${auditTrail?.links.length ?? 0} links, ${auditTrail?.sources.length ?? 0} sources`}