diff --git a/MosaicIQ/src-tauri/src/agent/gateway.rs b/MosaicIQ/src-tauri/src/agent/gateway.rs index 682caf7..17044c9 100644 --- a/MosaicIQ/src-tauri/src/agent/gateway.rs +++ b/MosaicIQ/src-tauri/src/agent/gateway.rs @@ -25,6 +25,7 @@ pub struct AgentToolRuntimeContext { pub stream_emitter: Arc>, pub command_executor: Arc, pub pending_approvals: Arc, + pub request_id: String, pub workspace_id: String, } @@ -67,6 +68,7 @@ impl ChatGateway for RigChatGateway { stream_emitter: tool_runtime.stream_emitter.clone(), command_executor: tool_runtime.command_executor.clone(), pending_approvals: tool_runtime.pending_approvals.clone(), + request_id: tool_runtime.request_id.clone(), workspace_id: tool_runtime.workspace_id.clone(), }; let client = build_openai_client(&runtime)?; diff --git a/MosaicIQ/src-tauri/src/agent/mod.rs b/MosaicIQ/src-tauri/src/agent/mod.rs index 99f9184..6f46e32 100644 --- a/MosaicIQ/src-tauri/src/agent/mod.rs +++ b/MosaicIQ/src-tauri/src/agent/mod.rs @@ -16,8 +16,8 @@ pub use stream_events::AgentStreamEmitter; pub use types::{ default_task_defaults, AgentConfigStatus, AgentProviderKind, AgentProviderStatuses, AgentRuntimeConfig, AgentStoredSettings, AgentStreamItemEvent, AgentStreamItemKind, - AgentTaskRoute, ChatPanelContext, ChatPromptRequest, ChatStreamStart, OllamaProviderSettings, - PreparedChatTurn, ProviderConfigStatus, RemoteProviderSettings, + AgentTaskRoute, CancelChatStreamRequest, ChatPanelContext, ChatPromptRequest, ChatStreamStart, + OllamaProviderSettings, PreparedChatTurn, ProviderConfigStatus, RemoteProviderSettings, ResolveAgentToolApprovalRequest, SaveAgentRuntimeConfigRequest, TaskProfile, UpdateRemoteApiKeyRequest, AGENT_SETTINGS_STORE_PATH, DEFAULT_OLLAMA_BASE_URL, DEFAULT_OLLAMA_COMPAT_API_KEY, DEFAULT_REMOTE_BASE_URL, DEFAULT_REMOTE_MODEL, diff --git a/MosaicIQ/src-tauri/src/agent/stream_events.rs b/MosaicIQ/src-tauri/src/agent/stream_events.rs index fb2350d..5cd960b 100644 --- a/MosaicIQ/src-tauri/src/agent/stream_events.rs +++ b/MosaicIQ/src-tauri/src/agent/stream_events.rs @@ -93,6 +93,10 @@ impl AgentStreamEmitter { self.emit(self.event(AgentStreamItemKind::StreamComplete)) } + pub fn cancelled(&self) -> Result<(), AppError> { + self.emit(self.event(AgentStreamItemKind::Cancelled)) + } + pub fn error(&self, message: String) -> Result<(), AppError> { self.emit(self.event(AgentStreamItemKind::Error).with_error(message)) } diff --git a/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs b/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs index d6859e7..aff7f32 100644 --- a/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs +++ b/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs @@ -41,6 +41,7 @@ pub struct RunTerminalCommandTool { pub stream_emitter: Arc>, pub command_executor: Arc, pub pending_approvals: Arc, + pub request_id: String, pub workspace_id: String, } @@ -143,7 +144,9 @@ impl RunTerminalCommandTool { } async fn await_approval(&self, command: &str) -> Result { - let (approval_id, receiver) = self.pending_approvals.register()?; + let (approval_id, receiver) = self + .pending_approvals + .register_for_request(Some(self.request_id.clone()))?; let (title, message) = approval_prompt(command); self.stream_emitter.approval_required( @@ -354,6 +357,7 @@ mod tests { )), command_executor: executor.clone(), pending_approvals: Arc::new(PendingAgentToolApprovals::new()), + request_id: "request-1".to_string(), workspace_id: "workspace-1".to_string(), }; @@ -387,6 +391,7 @@ mod tests { )), command_executor: executor.clone(), pending_approvals: approvals.clone(), + request_id: "request-1".to_string(), workspace_id: "workspace-1".to_string(), }; @@ -425,6 +430,7 @@ mod tests { )), command_executor: executor.clone(), pending_approvals: approvals.clone(), + request_id: "request-1".to_string(), workspace_id: "workspace-1".to_string(), }; @@ -463,6 +469,7 @@ mod tests { )), command_executor: executor.clone(), pending_approvals: approvals.clone(), + request_id: "request-1".to_string(), workspace_id: "workspace-1".to_string(), }; diff --git a/MosaicIQ/src-tauri/src/agent/types.rs b/MosaicIQ/src-tauri/src/agent/types.rs index 4a9eef3..82c760a 100644 --- a/MosaicIQ/src-tauri/src/agent/types.rs +++ b/MosaicIQ/src-tauri/src/agent/types.rs @@ -93,6 +93,13 @@ pub struct ChatStreamStart { pub session_id: String, } +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelChatStreamRequest { + pub workspace_id: String, + pub request_id: String, +} + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum AgentStreamItemKind { @@ -102,6 +109,7 @@ pub enum AgentStreamItemKind { ToolResult, ApprovalRequired, StreamComplete, + Cancelled, Error, } diff --git a/MosaicIQ/src-tauri/src/commands/terminal.rs b/MosaicIQ/src-tauri/src/commands/terminal.rs index 4b85afa..c7e631b 100644 --- a/MosaicIQ/src-tauri/src/commands/terminal.rs +++ b/MosaicIQ/src-tauri/src/commands/terminal.rs @@ -1,8 +1,9 @@ use tauri::Manager; +use tokio::sync::oneshot; use crate::agent::{ - AgentStreamEmitter, AgentToolRuntimeContext, ChatGateway, ChatPromptRequest, ChatStreamStart, - ResolveAgentToolApprovalRequest, + AgentStreamEmitter, AgentToolRuntimeContext, CancelChatStreamRequest, ChatGateway, + ChatPromptRequest, ChatStreamStart, ResolveAgentToolApprovalRequest, }; use crate::state::AppState; use crate::terminal::{ @@ -56,8 +57,13 @@ pub async fn start_chat_stream( }; let app_handle = app.clone(); + let active_chat_streams = state.active_chat_streams.clone(); let command_executor = state.command_service.clone(); let pending_approvals = state.pending_agent_tool_approvals.clone(); + let (cancel_sender, cancel_receiver) = oneshot::channel(); + active_chat_streams + .register(request_id.clone(), cancel_sender) + .map_err(|error| error.to_string())?; let stream_emitter = std::sync::Arc::new(AgentStreamEmitter::new( app_handle.clone(), prepared_turn.workspace_id.clone(), @@ -65,10 +71,9 @@ pub async fn start_chat_stream( prepared_turn.session_id.clone(), )); tauri::async_runtime::spawn(async move { - // Resolve the upstream stream outside the mutex so long-running provider I/O - // does not block other settings reads or chat requests. - let reply = match gateway - .stream_chat( + let request_id = request_id.clone(); + let reply = tokio::select! { + reply = gateway.stream_chat( prepared_turn.runtime.clone(), prepared_turn.prompt.clone(), prepared_turn.context_messages.clone(), @@ -76,12 +81,22 @@ pub async fn start_chat_stream( AgentToolRuntimeContext { stream_emitter: stream_emitter.clone(), command_executor, - pending_approvals, + pending_approvals: pending_approvals.clone(), + request_id: request_id.clone(), workspace_id: prepared_turn.workspace_id.clone(), }, - ) - .await - { + ) => reply, + _ = cancel_receiver => { + pending_approvals.cancel_for_request(&request_id); + let _ = active_chat_streams.remove(&request_id); + let _ = stream_emitter.cancelled(); + return; + } + }; + + let _ = active_chat_streams.remove(&request_id); + + let reply = match reply { Ok(reply) => reply, Err(error) => { let _ = stream_emitter.error(error.to_string()); @@ -94,13 +109,35 @@ pub async fn start_chat_stream( let state = app_handle.state::(); let mut agent = state.agent.lock().await; let _ = agent.record_assistant_reply(&prepared_turn.session_id, &reply); - let _ = stream_emitter.stream_complete(); }); Ok(start) } +/// Cancels an active streaming chat request if it is still running. +#[tauri::command] +pub async fn cancel_chat_stream( + state: tauri::State<'_, AppState>, + request: CancelChatStreamRequest, +) -> Result<(), String> { + let workspace_id = request.workspace_id.trim(); + let request_id = request.request_id.trim(); + if workspace_id.is_empty() { + return Err("workspace id cannot be empty".to_string()); + } + if request_id.is_empty() { + return Err("request id cannot be empty".to_string()); + } + + state.pending_agent_tool_approvals.cancel_for_request(request_id); + state + .active_chat_streams + .cancel(request_id) + .map_err(|error| error.to_string())?; + Ok(()) +} + /// Resolves a pending agent-triggered command approval. #[tauri::command] pub async fn resolve_agent_tool_approval( diff --git a/MosaicIQ/src-tauri/src/lib.rs b/MosaicIQ/src-tauri/src/lib.rs index ae316d2..911097c 100644 --- a/MosaicIQ/src-tauri/src/lib.rs +++ b/MosaicIQ/src-tauri/src/lib.rs @@ -42,6 +42,7 @@ pub fn run() { commands::terminal::execute_terminal_command, commands::terminal::lookup_company, commands::terminal::start_chat_stream, + commands::terminal::cancel_chat_stream, commands::terminal::resolve_agent_tool_approval, commands::settings::get_agent_config_status, commands::settings::save_agent_runtime_config, diff --git a/MosaicIQ/src-tauri/src/state.rs b/MosaicIQ/src-tauri/src/state.rs index 5417cd6..317a7b8 100644 --- a/MosaicIQ/src-tauri/src/state.rs +++ b/MosaicIQ/src-tauri/src/state.rs @@ -21,9 +21,14 @@ use crate::terminal::security_lookup::SecurityLookup; use crate::terminal::types::AvailableXbrlConcept; use crate::terminal::TerminalCommandService; +struct PendingApprovalEntry { + request_id: Option, + sender: oneshot::Sender, +} + pub struct PendingAgentToolApprovals { next_approval_id: AtomicU64, - senders: Mutex>>, + senders: Mutex>, } impl PendingAgentToolApprovals { @@ -34,7 +39,10 @@ impl PendingAgentToolApprovals { } } - pub fn register(&self) -> Result<(String, oneshot::Receiver), AppError> { + pub fn register_for_request( + &self, + request_id: Option, + ) -> Result<(String, oneshot::Receiver), AppError> { let approval_id = format!( "approval-{}", self.next_approval_id.fetch_add(1, Ordering::Relaxed) @@ -44,19 +52,23 @@ impl PendingAgentToolApprovals { .senders .lock() .map_err(|_| AppError::InvalidSettings("approval state is unavailable".to_string()))?; - senders.insert(approval_id.clone(), sender); + senders.insert( + approval_id.clone(), + PendingApprovalEntry { request_id, sender }, + ); Ok((approval_id, receiver)) } pub fn resolve(&self, approval_id: &str, approved: bool) -> Result<(), AppError> { - let sender = self + let entry = self .senders .lock() .map_err(|_| AppError::InvalidSettings("approval state is unavailable".to_string()))? .remove(approval_id) .ok_or_else(|| AppError::AgentToolApprovalNotFound(approval_id.to_string()))?; - sender + entry + .sender .send(approved) .map_err(|_| AppError::AgentToolApprovalNotFound(approval_id.to_string())) } @@ -66,6 +78,16 @@ impl PendingAgentToolApprovals { senders.remove(approval_id); } } + + pub fn cancel_for_request(&self, request_id: &str) { + if let Ok(mut senders) = self.senders.lock() { + senders.retain(|_, entry| { + entry.request_id + .as_deref() + .is_none_or(|entry_request_id| entry_request_id != request_id) + }); + } + } } impl Default for PendingAgentToolApprovals { @@ -74,6 +96,55 @@ impl Default for PendingAgentToolApprovals { } } +pub struct ActiveChatStreams { + senders: Mutex>>, +} + +impl ActiveChatStreams { + pub fn new() -> Self { + Self { + senders: Mutex::new(HashMap::new()), + } + } + + pub fn register( + &self, + request_id: String, + sender: oneshot::Sender<()>, + ) -> Result<(), AppError> { + let mut senders = self + .senders + .lock() + .map_err(|_| AppError::InvalidSettings("active stream state is unavailable".to_string()))?; + senders.insert(request_id, sender); + Ok(()) + } + + pub fn cancel(&self, request_id: &str) -> Result { + let sender = self + .senders + .lock() + .map_err(|_| AppError::InvalidSettings("active stream state is unavailable".to_string()))? + .remove(request_id); + + Ok(sender.is_some_and(|sender| sender.send(()).is_ok())) + } + + pub fn remove(&self, request_id: &str) -> Result<(), AppError> { + self.senders + .lock() + .map_err(|_| AppError::InvalidSettings("active stream state is unavailable".to_string()))? + .remove(request_id); + Ok(()) + } +} + +impl Default for ActiveChatStreams { + fn default() -> Self { + Self::new() + } +} + struct SettingsBackedSecUserAgentProvider { settings: AgentSettingsService, } @@ -111,6 +182,8 @@ pub struct AppState { pub edgar_lookup: Arc, /// Cache of XBRL concepts extracted from filings, keyed by uppercase ticker. pub concept_cache: Arc>>>, + /// Active chat streams keyed by request id for cancellation. + pub active_chat_streams: Arc, /// Pending approvals for agent-triggered mutating commands. pub pending_agent_tool_approvals: Arc, } @@ -183,7 +256,49 @@ impl AppState { mappings_service, edgar_lookup, concept_cache: Arc::new(Mutex::new(HashMap::new())), + active_chat_streams: Arc::new(ActiveChatStreams::new()), pending_agent_tool_approvals: Arc::new(PendingAgentToolApprovals::new()), }) } } + +#[cfg(test)] +mod tests { + use tokio::sync::oneshot; + + use super::{ActiveChatStreams, PendingAgentToolApprovals}; + + #[test] + fn cancel_for_request_drops_matching_pending_approvals() { + let approvals = PendingAgentToolApprovals::new(); + let (_approval_id, receiver) = approvals + .register_for_request(Some("request-1".to_string())) + .expect("approval should register"); + + approvals.cancel_for_request("request-1"); + + assert!(receiver.blocking_recv().is_err()); + } + + #[test] + fn cancel_chat_stream_removes_registered_sender() { + let active_streams = ActiveChatStreams::new(); + let (cancel_sender, cancel_receiver) = oneshot::channel(); + + active_streams + .register("request-1".to_string(), cancel_sender) + .expect("stream should register"); + + assert!( + active_streams + .cancel("request-1") + .expect("stream cancellation should succeed") + ); + assert!(cancel_receiver.blocking_recv().is_ok()); + assert!( + !active_streams + .cancel("request-1") + .expect("second cancellation should be a no-op") + ); + } +} diff --git a/MosaicIQ/src/App.tsx b/MosaicIQ/src/App.tsx index ad0d3ee..8d555ba 100644 --- a/MosaicIQ/src/App.tsx +++ b/MosaicIQ/src/App.tsx @@ -101,9 +101,10 @@ function App() { const [researchIntent, setResearchIntent] = React.useState(null); const [agentStatus, setAgentStatus] = React.useState(null); const [sidebarOpen, setSidebarOpen] = React.useState(true); - const outputRef = useRef(null); const hasAutoLoadedPortfolioRef = useRef(false); const activePortfolioWorkflow = portfolioWorkflow.readWorkflow(tabs.activeWorkspaceId); + const activeTerminalState = tabs.activeWorkspace?.terminal; + const activePendingApproval = activeTerminalState?.pendingApproval ?? null; const researchWorkspaces = useResearchWorkspaces(); const { captureNote: captureResearchNote } = useResearchCaptureFlow({ ensureWorkspace: researchWorkspaces.ensureWorkspace, @@ -165,14 +166,13 @@ function App() { ); const { + cancelWorkspaceRequest, commandInputRef, clearWorkspaceSession, handleClearPortfolioAction, handleCommand, handleStartPortfolioAction, handleUpdatePortfolioDraft, - isProcessing, - pendingAgentApproval, resetCommandIndex, resolvePendingAgentApproval, runCommand, @@ -188,6 +188,28 @@ function App() { clearWorkspaceSession(tabs.activeWorkspaceId); }, [clearWorkspaceSession, tabs.activeWorkspaceId]); + const handleCloseWorkspace = useCallback( + (workspaceId: string) => { + const workspace = tabs.workspaces.find((candidate) => candidate.id === workspaceId); + if (!workspace) { + return; + } + + void (async () => { + if (workspace.terminal.activeRequest?.kind === 'chat') { + try { + await cancelWorkspaceRequest(workspaceId); + } catch { + // Closing the workspace should not be blocked by cancellation transport errors. + } + } + + tabs.closeWorkspace(workspaceId); + })(); + }, + [cancelWorkspaceRequest, tabs], + ); + const handleCreateWorkspace = useCallback(() => { setActiveView('terminal'); tabs.createWorkspace(); @@ -197,9 +219,9 @@ function App() { activeView, activeWorkspaceId: tabs.activeWorkspaceId, commandInputRef, - isProcessing, + isProcessing: activeTerminalState?.isProcessing ?? false, onClearTerminal: clearTerminal, - onCloseWorkspace: tabs.closeWorkspace, + onCloseWorkspace: handleCloseWorkspace, onCreateWorkspace: handleCreateWorkspace, onOpenResearch: () => { void handleOpenResearch(); @@ -231,15 +253,25 @@ function App() { }, []); useEffect(() => { - if (hasAutoLoadedPortfolioRef.current || activeView !== 'terminal' || isProcessing) { + if ( + hasAutoLoadedPortfolioRef.current || + activeView !== 'terminal' || + activeTerminalState?.isProcessing + ) { return; } hasAutoLoadedPortfolioRef.current = true; // Reset portfolio loading state before triggering command portfolioWorkflow.resetPortfolioLoading(tabs.activeWorkspaceId); - void handleCommand('/portfolio'); - }, [activeView, handleCommand, isProcessing, portfolioWorkflow, tabs.activeWorkspaceId]); + void handleCommand(tabs.activeWorkspaceId, '/portfolio'); + }, [ + activeTerminalState?.isProcessing, + activeView, + handleCommand, + portfolioWorkflow, + tabs.activeWorkspaceId, + ]); // Sync portfolio names from workflow to workspaces React.useEffect(() => { @@ -258,6 +290,8 @@ function App() { name: workspace.name, isActive: workspace.id === tabs.activeWorkspaceId, portfolioName: workflow.portfolioName, + isProcessing: workspace.terminal.isProcessing, + hasPendingApproval: workspace.terminal.pendingApproval !== null, }; }); @@ -276,14 +310,14 @@ function App() { resolvePendingAgentApproval(true)} - onCancel={() => resolvePendingAgentApproval(false)} + onConfirm={() => resolvePendingAgentApproval(tabs.activeWorkspaceId, true)} + onCancel={() => resolvePendingAgentApproval(tabs.activeWorkspaceId, false)} /> ); @@ -303,7 +337,9 @@ function App() { void handleOpenSettings(); }} onToggle={() => setSidebarOpen((prev) => !prev)} - onCommand={handleCommand} + onCommand={(command) => { + void handleCommand(tabs.activeWorkspaceId, command); + }} portfolio={activePortfolioWorkflow.portfolioSnapshot} tickerHistory={tickerHistory.history} isTickerHistoryLoaded={tickerHistory.isLoaded} @@ -320,7 +356,7 @@ function App() { setActiveView('terminal'); tabs.setActiveWorkspace(id); }} - onTabClose={(id) => tabs.closeWorkspace(id)} + onTabClose={handleCloseWorkspace} onNewTab={handleCreateWorkspace} onTabRename={(id, name) => tabs.renameWorkspace(id, name)} /> @@ -345,12 +381,18 @@ function App() { ) : ( { + void handleCommand(tabs.activeWorkspaceId, command); + }} + onInputDraftChange={(value) => + tabs.setWorkspaceInputDraft(tabs.activeWorkspaceId, value) + } + onRunCommand={(command) => runCommand(tabs.activeWorkspaceId, command)} onStartPortfolioAction={handleStartPortfolioAction} onUpdatePortfolioDraft={handleUpdatePortfolioDraft} onClearPortfolioAction={handleClearPortfolioAction} @@ -359,19 +401,24 @@ function App() { researchWorkspaces={researchWorkspaces.workspaces} activeResearchWorkspaceId={researchWorkspaces.activeWorkspaceId} onCaptureResearchNote={handleCaptureResearchNote} + scrollTop={activeTerminalState?.scrollTop ?? 0} + stickToBottom={activeTerminalState?.stickToBottom ?? true} + collapsedThinkingEntryIds={activeTerminalState?.collapsedThinkingEntryIds ?? []} + onScrollStateChange={tabs.setWorkspaceScrollState} + onToggleThinkingEntry={tabs.toggleWorkspaceThinkingCollapse} /> )} resolvePendingAgentApproval(true)} - onCancel={() => resolvePendingAgentApproval(false)} + onConfirm={() => resolvePendingAgentApproval(tabs.activeWorkspaceId, true)} + onCancel={() => resolvePendingAgentApproval(tabs.activeWorkspaceId, false)} /> ); diff --git a/MosaicIQ/src/components/TabBar/TabBar.tsx b/MosaicIQ/src/components/TabBar/TabBar.tsx index 3dc1b74..5e37131 100644 --- a/MosaicIQ/src/components/TabBar/TabBar.tsx +++ b/MosaicIQ/src/components/TabBar/TabBar.tsx @@ -5,6 +5,8 @@ export interface Tab { name: string; isActive: boolean; portfolioName?: string | null; + isProcessing?: boolean; + hasPendingApproval?: boolean; } interface TabBarProps { @@ -87,9 +89,17 @@ export const TabBar: React.FC = ({ /> ) : (
- - {tab.name} - +
+ + {tab.name} + + {tab.isProcessing ? ( + + ) : null} + {tab.hasPendingApproval ? ( + + ) : null} +
{tab.portfolioName && ( {tab.portfolioName} diff --git a/MosaicIQ/src/components/Terminal/CommandInput.tsx b/MosaicIQ/src/components/Terminal/CommandInput.tsx index 816687f..33ffb06 100644 --- a/MosaicIQ/src/components/Terminal/CommandInput.tsx +++ b/MosaicIQ/src/components/Terminal/CommandInput.tsx @@ -22,6 +22,9 @@ import { } from '../../lib/terminalShadow'; interface CommandInputProps { + workspaceId: string; + value: string; + onValueChange: (value: string) => void; onSubmit: (command: string) => void; onStartPortfolioAction: (action: PortfolioAction) => void; onUpdatePortfolioDraft: (patch: Partial) => void; @@ -118,6 +121,9 @@ const buildGeneratedCommand = ( export const CommandInput = React.forwardRef( ( { + workspaceId, + value, + onValueChange, onSubmit, onStartPortfolioAction, onUpdatePortfolioDraft, @@ -132,7 +138,6 @@ export const CommandInput = React.forwardRef { - const [input, setInput] = useState(''); const [showSuggestions, setShowSuggestions] = useState(false); const [activeSuggestionIndex, setActiveSuggestionIndex] = useState(0); const [shadowCollapsed, setShadowCollapsed] = useState(false); @@ -148,8 +153,8 @@ export const CommandInput = React.forwardRef (actionComposerActive ? null : resolveTerminalShadow(input)), - [actionComposerActive, input], + () => (actionComposerActive ? null : resolveTerminalShadow(value)), + [actionComposerActive, value], ); useEffect(() => { @@ -160,14 +165,20 @@ export const CommandInput = React.forwardRef { + setShowSuggestions(value.startsWith('/')); + setActiveSuggestionIndex(0); + setShadowCollapsed(false); + }, [value, workspaceId]); const suggestionMatches = useMemo( () => TERMINAL_COMMAND_SUGGESTIONS.filter( - (suggestion) => !input || suggestion.command.startsWith(input), + (suggestion) => !value || suggestion.command.startsWith(value), ), - [input], + [value], ); useEffect(() => { @@ -185,21 +196,21 @@ export const CommandInput = React.forwardRef ({ focusWithText: (text: string) => { - setInput(text); + onValueChange(text); setShowSuggestions(text.startsWith('/')); setActiveSuggestionIndex(0); resetCommandIndex(); inputRef.current?.focus(); }, }), - [resetCommandIndex], + [onValueChange, resetCommandIndex], ); const handleSubmit = () => { - const trimmed = input.trim(); + const trimmed = value.trim(); if (trimmed && !isProcessing) { onSubmit(trimmed); - setInput(''); + onValueChange(''); setShowSuggestions(false); setActiveSuggestionIndex(0); resetCommandIndex(); @@ -213,7 +224,7 @@ export const CommandInput = React.forwardRef) => { - setInput(event.target.value); + onValueChange(event.target.value); setShowSuggestions(event.target.value.startsWith('/')); setActiveSuggestionIndex(0); }; @@ -413,7 +424,7 @@ export const CommandInput = React.forwardRef
- ) : input ? ( + ) : value ? ( ) : null} @@ -564,7 +575,7 @@ export const CommandInput = React.forwardRef { - setInput(example); + onValueChange(example); setShowSuggestions(false); setActiveSuggestionIndex(0); resetCommandIndex(); diff --git a/MosaicIQ/src/components/Terminal/Terminal.tsx b/MosaicIQ/src/components/Terminal/Terminal.tsx index 9c1c574..527340e 100644 --- a/MosaicIQ/src/components/Terminal/Terminal.tsx +++ b/MosaicIQ/src/components/Terminal/Terminal.tsx @@ -20,11 +20,13 @@ import { TerminalOutput } from './TerminalOutput'; import { CommandInput, CommandInputHandle } from './CommandInput'; interface TerminalProps { + workspaceId: string; history: TerminalEntry[]; + inputDraft: string; isProcessing: boolean; - outputRef: React.RefObject; inputRef: React.RefObject; onSubmit: (command: string) => void; + onInputDraftChange: (value: string) => void; onRunCommand: (command: string) => void; onStartPortfolioAction: ( action: PortfolioAction, @@ -42,14 +44,24 @@ interface TerminalProps { explicitWorkspaceId?: string | null; autoCreateFromTicker?: boolean; }) => Promise; + scrollTop: number; + stickToBottom: boolean; + collapsedThinkingEntryIds: string[]; + onScrollStateChange: ( + workspaceId: string, + scrollState: { scrollTop: number; stickToBottom: boolean }, + ) => void; + onToggleThinkingEntry: (workspaceId: string, entryId: string) => void; } export const Terminal: React.FC = ({ + workspaceId, history, + inputDraft, isProcessing, - outputRef, inputRef, onSubmit, + onInputDraftChange, onRunCommand, onStartPortfolioAction, onUpdatePortfolioDraft, @@ -59,6 +71,11 @@ export const Terminal: React.FC = ({ researchWorkspaces, activeResearchWorkspaceId, onCaptureResearchNote, + scrollTop, + stickToBottom, + collapsedThinkingEntryIds, + onScrollStateChange, + onToggleThinkingEntry, }) => { const [terminalCapture, setTerminalCapture] = React.useState<{ key: string; @@ -108,6 +125,9 @@ export const Terminal: React.FC = ({
= ({ {/* Terminal Output */} diff --git a/MosaicIQ/src/components/Terminal/TerminalOutput.tsx b/MosaicIQ/src/components/Terminal/TerminalOutput.tsx index 4a42e70..2b60f0b 100644 --- a/MosaicIQ/src/components/Terminal/TerminalOutput.tsx +++ b/MosaicIQ/src/components/Terminal/TerminalOutput.tsx @@ -19,8 +19,16 @@ import { DividendsPanel } from '../Panels/DividendsPanel'; import { EarningsPanel } from '../Panels/EarningsPanel'; interface TerminalOutputProps { + workspaceId: string; history: TerminalEntry[]; - outputRef: React.RefObject; + scrollTop: number; + stickToBottom: boolean; + collapsedThinkingEntryIds: string[]; + onScrollStateChange: (workspaceId: string, scrollState: { + scrollTop: number; + stickToBottom: boolean; + }) => void; + onToggleThinkingEntry: (workspaceId: string, entryId: string) => void; onRunCommand: (command: string) => void; onStartPortfolioAction: ( action: PortfolioAction, @@ -29,26 +37,25 @@ interface TerminalOutputProps { } export const TerminalOutput: React.FC = ({ + workspaceId, history, - outputRef, + scrollTop, + stickToBottom, + collapsedThinkingEntryIds, + onScrollStateChange, + onToggleThinkingEntry, onRunCommand, onStartPortfolioAction, }) => { + const outputRef = useRef(null); const contentRef = useRef(null); - const shouldStickToBottomRef = useRef(true); - const [collapsedThinkingEntries, setCollapsedThinkingEntries] = useState>(new Set()); + const shouldStickToBottomRef = useRef(stickToBottom); + const [hasRestoredScroll, setHasRestoredScroll] = useState(false); + const collapsedThinkingEntries = new Set(collapsedThinkingEntryIds); const toggleThinkingEntry = useCallback((entryId: string) => { - setCollapsedThinkingEntries((prev) => { - const next = new Set(prev); - if (next.has(entryId)) { - next.delete(entryId); - } else { - next.add(entryId); - } - return next; - }); - }, []); + onToggleThinkingEntry(workspaceId, entryId); + }, [onToggleThinkingEntry, workspaceId]); const isNearBottom = useCallback((element: HTMLDivElement) => { const distanceFromBottom = @@ -69,6 +76,14 @@ export const TerminalOutput: React.FC = ({ }); }, [outputRef]); + useEffect(() => { + shouldStickToBottomRef.current = stickToBottom; + }, [stickToBottom]); + + useEffect(() => { + setHasRestoredScroll(false); + }, [workspaceId]); + useEffect(() => { const element = outputRef.current; if (!element) { @@ -76,7 +91,12 @@ export const TerminalOutput: React.FC = ({ } const handleScroll = () => { - shouldStickToBottomRef.current = isNearBottom(element); + const nextStickToBottom = isNearBottom(element); + shouldStickToBottomRef.current = nextStickToBottom; + onScrollStateChange(workspaceId, { + scrollTop: element.scrollTop, + stickToBottom: nextStickToBottom, + }); }; handleScroll(); @@ -85,7 +105,19 @@ export const TerminalOutput: React.FC = ({ return () => { element.removeEventListener('scroll', handleScroll); }; - }, [isNearBottom, outputRef]); + }, [isNearBottom, onScrollStateChange, workspaceId]); + + useEffect(() => { + const element = outputRef.current; + if (!element || hasRestoredScroll) { + return; + } + + requestAnimationFrame(() => { + element.scrollTop = shouldStickToBottomRef.current ? element.scrollHeight : scrollTop; + setHasRestoredScroll(true); + }); + }, [hasRestoredScroll, history, scrollTop]); useEffect(() => { scrollToBottom(); diff --git a/MosaicIQ/src/hooks/useTabs.ts b/MosaicIQ/src/hooks/useTabs.ts index 5d5acfd..f40f268 100644 --- a/MosaicIQ/src/hooks/useTabs.ts +++ b/MosaicIQ/src/hooks/useTabs.ts @@ -1,11 +1,37 @@ import { useState, useCallback } from 'react'; -import { TerminalEntry } from '../types/terminal'; +import { + PendingAgentApproval, + TerminalEntry, + TerminalRequestState, + WorkspaceTerminalState, +} from '../types/terminal'; + +const DEFAULT_WELCOME_MESSAGE = + 'MosaicIQ Financial Terminal v1.0\nUse /portfolio to open portfolio tools.\nSlash commands (/) clear the panel. Natural language builds a conversation.'; + +const createInitialTerminalState = (entryId: string): WorkspaceTerminalState => ({ + history: [ + { + id: entryId, + type: 'system', + content: DEFAULT_WELCOME_MESSAGE, + timestamp: new Date(), + }, + ], + chatSessionId: undefined, + inputDraft: '', + isProcessing: false, + activeRequest: null, + pendingApproval: null, + scrollTop: 0, + stickToBottom: true, + collapsedThinkingEntryIds: [], +}); export interface Workspace { id: string; name: string; - history: TerminalEntry[]; - chatSessionId?: string; + terminal: WorkspaceTerminalState; createdAt: Date; portfolioName?: string | null; } @@ -15,17 +41,9 @@ export const useTabs = () => { { id: '1', name: 'Terminal 1', - history: [ - { - id: 'welcome', - type: 'system', - content: 'MosaicIQ Financial Terminal v1.0\nUse /portfolio to open portfolio tools.\nSlash commands (/) clear the panel. Natural language builds a conversation.', - timestamp: new Date() - } - ], - chatSessionId: undefined, - createdAt: new Date() - } + terminal: createInitialTerminalState('welcome'), + createdAt: new Date(), + }, ]); const [activeWorkspaceId, setActiveWorkspaceId] = useState('1'); @@ -35,16 +53,8 @@ export const useTabs = () => { const newWorkspace: Workspace = { id: Date.now().toString(), name: `Terminal ${workspaces.length + 1}`, - history: [ - { - id: `welcome-${Date.now()}`, - type: 'system', - content: 'MosaicIQ Financial Terminal v1.0\nUse /portfolio to open portfolio tools.\nSlash commands (/) clear the panel. Natural language builds a conversation.', - timestamp: new Date() - } - ], - chatSessionId: undefined, - createdAt: new Date() + terminal: createInitialTerminalState(`welcome-${Date.now()}`), + createdAt: new Date(), }; setWorkspaces(prev => [...prev, newWorkspace]); @@ -73,24 +83,33 @@ export const useTabs = () => { setActiveWorkspaceId(id); }, []); + const updateWorkspaceTerminal = useCallback( + ( + id: string, + updater: (terminal: WorkspaceTerminalState) => WorkspaceTerminalState, + ) => { + setWorkspaces((prev) => + prev.map((workspace) => + workspace.id === id + ? { ...workspace, terminal: updater(workspace.terminal) } + : workspace, + ), + ); + }, + [], + ); + const updateWorkspaceHistory = useCallback((id: string, history: TerminalEntry[]) => { - setWorkspaces(prev => - prev.map(w => - w.id === id ? { ...w, history } : w - ) - ); + updateWorkspaceTerminal(id, (terminal) => ({ ...terminal, history })); }, []); const appendWorkspaceEntry = useCallback((id: string, entry: TerminalEntry) => { // Appending in place keeps a stable entry id available for later stream updates. - setWorkspaces((prev) => - prev.map((workspace) => - workspace.id === id - ? { ...workspace, history: [...workspace.history, entry] } - : workspace, - ), - ); - }, []); + updateWorkspaceTerminal(id, (terminal) => ({ + ...terminal, + history: [...terminal.history, entry], + })); + }, [updateWorkspaceTerminal]); const updateWorkspaceEntry = useCallback( ( @@ -99,38 +118,87 @@ export const useTabs = () => { updater: (entry: TerminalEntry) => TerminalEntry, ) => { // Update a single entry without rebuilding unrelated workspaces. - setWorkspaces((prev) => - prev.map((workspace) => - workspace.id === id - ? { - ...workspace, - history: workspace.history.map((entry) => - entry.id === entryId ? updater(entry) : entry, - ), - } - : workspace, + updateWorkspaceTerminal(id, (terminal) => ({ + ...terminal, + history: terminal.history.map((entry) => + entry.id === entryId ? updater(entry) : entry, ), - ); + })); }, - [], + [updateWorkspaceTerminal], ); const clearWorkspace = useCallback((id: string) => { - setWorkspaces((prev) => - prev.map((workspace) => - workspace.id === id ? { ...workspace, history: [] } : workspace, - ), - ); - }, []); + updateWorkspaceTerminal(id, (terminal) => ({ + ...terminal, + history: [], + chatSessionId: undefined, + inputDraft: '', + isProcessing: false, + activeRequest: null, + pendingApproval: null, + scrollTop: 0, + stickToBottom: true, + collapsedThinkingEntryIds: [], + })); + }, [updateWorkspaceTerminal]); const setWorkspaceSession = useCallback((id: string, chatSessionId?: string) => { // Session ids are scoped per workspace so each tab can maintain an independent conversation. - setWorkspaces((prev) => - prev.map((workspace) => - workspace.id === id ? { ...workspace, chatSessionId } : workspace, - ), - ); - }, []); + updateWorkspaceTerminal(id, (terminal) => ({ ...terminal, chatSessionId })); + }, [updateWorkspaceTerminal]); + + const setWorkspaceInputDraft = useCallback((id: string, inputDraft: string) => { + updateWorkspaceTerminal(id, (terminal) => ({ ...terminal, inputDraft })); + }, [updateWorkspaceTerminal]); + + const setWorkspaceProcessing = useCallback((id: string, isProcessing: boolean) => { + updateWorkspaceTerminal(id, (terminal) => ({ ...terminal, isProcessing })); + }, [updateWorkspaceTerminal]); + + const setWorkspaceActiveRequest = useCallback( + (id: string, activeRequest: TerminalRequestState | null) => { + updateWorkspaceTerminal(id, (terminal) => ({ ...terminal, activeRequest })); + }, + [updateWorkspaceTerminal], + ); + + const setWorkspacePendingApproval = useCallback( + (id: string, pendingApproval: PendingAgentApproval | null) => { + updateWorkspaceTerminal(id, (terminal) => ({ ...terminal, pendingApproval })); + }, + [updateWorkspaceTerminal], + ); + + const setWorkspaceScrollState = useCallback( + ( + id: string, + scrollState: Pick, + ) => { + updateWorkspaceTerminal(id, (terminal) => ({ + ...terminal, + scrollTop: scrollState.scrollTop, + stickToBottom: scrollState.stickToBottom, + })); + }, + [updateWorkspaceTerminal], + ); + + const toggleWorkspaceThinkingCollapse = useCallback( + (id: string, entryId: string) => { + updateWorkspaceTerminal(id, (terminal) => { + const collapsedThinkingEntryIds = terminal.collapsedThinkingEntryIds.includes(entryId) + ? terminal.collapsedThinkingEntryIds.filter((currentId) => currentId !== entryId) + : [...terminal.collapsedThinkingEntryIds, entryId]; + + return { + ...terminal, + collapsedThinkingEntryIds, + }; + }); + }, + [updateWorkspaceTerminal], + ); const renameWorkspace = useCallback((id: string, name: string) => { setWorkspaces(prev => @@ -160,6 +228,13 @@ export const useTabs = () => { updateWorkspaceEntry, clearWorkspace, setWorkspaceSession, + setWorkspaceInputDraft, + setWorkspaceProcessing, + setWorkspaceActiveRequest, + setWorkspacePendingApproval, + setWorkspaceScrollState, + toggleWorkspaceThinkingCollapse, + updateWorkspaceTerminal, renameWorkspace, setPortfolioName, }; diff --git a/MosaicIQ/src/hooks/useTerminalOrchestrator.ts b/MosaicIQ/src/hooks/useTerminalOrchestrator.ts index 0b2fa83..82159f9 100644 --- a/MosaicIQ/src/hooks/useTerminalOrchestrator.ts +++ b/MosaicIQ/src/hooks/useTerminalOrchestrator.ts @@ -1,8 +1,8 @@ -import { useCallback, useEffect, useEffectEvent, useRef, useState } from 'react'; +import { useCallback, useEffect, useEffectEvent, useRef } from 'react'; import { CommandInputHandle } from '../components/Terminal/CommandInput'; import { createEntry } from './useTerminal'; -import { usePortfolioWorkflow, isPortfolioCommand } from './usePortfolioWorkflow'; -import { useTabs } from './useTabs'; +import { isPortfolioCommand, usePortfolioWorkflow } from './usePortfolioWorkflow'; +import { useTabs, Workspace } from './useTabs'; import { useTickerHistory } from './useTickerHistory'; import { extractChatPanelContext } from '../lib/chatPanelContext'; import { mappingsBridge } from '../lib/mappingsBridge'; @@ -12,9 +12,10 @@ import { extractTickerSymbolFromResponse, resolveTickerCommandFallback, } from '../lib/tickerHistory'; -import { terminalBridge } from '../lib/terminalBridge'; +import { createTerminalRequestId, terminalBridge } from '../lib/terminalBridge'; import { AgentStreamItemEvent, + PendingAgentApproval, PortfolioAction, PortfolioActionDraft, PortfolioActionSeed, @@ -23,15 +24,6 @@ import { type AppView = 'terminal' | 'research' | 'settings'; -export interface PendingAgentApproval { - approvalId: string; - command: string; - requestId: string; - workspaceId: string; - title: string; - message: string; -} - interface UseTerminalOrchestratorArgs { tabs: ReturnType; tickerHistory: ReturnType; @@ -39,31 +31,83 @@ interface UseTerminalOrchestratorArgs { setActiveView: React.Dispatch>; } +const COMMAND_FAILURE_MESSAGE = 'Command execution failed.'; +const CHAT_FAILURE_MESSAGE = 'Chat stream failed.'; +const CHAT_CANCELLED_MESSAGE = 'Chat cancelled.'; + export const useTerminalOrchestrator = ({ tabs, tickerHistory, portfolioWorkflow, setActiveView, }: UseTerminalOrchestratorArgs) => { - const [isProcessing, setIsProcessing] = useState(false); - const [pendingAgentApproval, setPendingAgentApproval] = - useState(null); const commandHistoryRefs = useRef>({}); const commandIndexRefs = useRef>({}); const commandInputRef = useRef(null); const refreshInFlightKeysRef = useRef>(new Set()); + const workspacesRef = useRef(tabs.workspaces); + + useEffect(() => { + workspacesRef.current = tabs.workspaces; + }, [tabs.workspaces]); + + const findWorkspace = useCallback((workspaceId: string) => { + return workspacesRef.current.find((workspace) => workspace.id === workspaceId) ?? null; + }, []); + + const isActiveRequest = useCallback( + (workspaceId: string, requestId: string) => { + const workspace = findWorkspace(workspaceId); + return workspace?.terminal.activeRequest?.requestId === requestId; + }, + [findWorkspace], + ); + + const finishWorkspaceRequest = useCallback( + (workspaceId: string, requestId: string) => { + if (!isActiveRequest(workspaceId, requestId)) { + return false; + } + + tabs.updateWorkspaceTerminal(workspaceId, (terminal) => ({ + ...terminal, + isProcessing: false, + activeRequest: null, + pendingApproval: null, + })); + return true; + }, + [isActiveRequest, tabs], + ); + + const appendWorkspaceSystemMessage = useCallback( + (workspaceId: string, content: string) => { + if (!findWorkspace(workspaceId)) { + return; + } + + tabs.appendWorkspaceEntry( + workspaceId, + createEntry({ + type: 'system', + content, + }), + ); + }, + [findWorkspace, tabs], + ); const pushCommandHistory = useCallback((workspaceId: string, command: string) => { if (!commandHistoryRefs.current[workspaceId]) { commandHistoryRefs.current[workspaceId] = []; } + commandHistoryRefs.current[workspaceId].push(command); commandIndexRefs.current[workspaceId] = -1; }, []); const clearWorkspaceSession = useCallback((workspaceId: string) => { tabs.clearWorkspace(workspaceId); - tabs.setWorkspaceSession(workspaceId, undefined); commandIndexRefs.current[workspaceId] = -1; }, [tabs]); @@ -73,6 +117,10 @@ export const useTerminalOrchestrator = ({ command: string | undefined, response: TerminalCommandResponse, ) => { + if (!findWorkspace(workspaceId)) { + return; + } + tabs.appendWorkspaceEntry( workspaceId, createEntry( @@ -91,7 +139,7 @@ export const useTerminalOrchestrator = ({ void tickerHistory.recordTicker(tickerSymbol); } }, - [portfolioWorkflow, tabs, tickerHistory], + [findWorkspace, portfolioWorkflow, tabs, tickerHistory], ); const replaceEarningsPanelResponse = useCallback( @@ -100,6 +148,10 @@ export const useTerminalOrchestrator = ({ panelEntryId: string, response: TerminalCommandResponse, ) => { + if (!findWorkspace(workspaceId)) { + return; + } + tabs.updateWorkspaceEntry(workspaceId, panelEntryId, (entry) => ({ ...entry, content: response.kind === 'panel' ? response.panel : entry.content, @@ -111,267 +163,294 @@ export const useTerminalOrchestrator = ({ void tickerHistory.recordTicker(tickerSymbol); } }, - [tabs, tickerHistory], + [findWorkspace, tabs, tickerHistory], ); - const refreshEarningsPanels = useEffectEvent( - async (event: MappingsUpdatedEvent) => { - await refreshAffectedEarningsPanels({ - workspaces: tabs.workspaces, - event, - execute: (panel) => - terminalBridge.executeTerminalCommand({ - workspaceId: panel.workspaceId, - input: panel.sourceCommand, - }), - replacePanel: (panel, response) => { - replaceEarningsPanelResponse(panel.workspaceId, panel.panelEntryId, response); - }, - reportFailure: (panel, reason) => { - tabs.appendWorkspaceEntry( - panel.workspaceId, - createEntry({ - type: 'system', - content: reason, - }), - ); - }, - inFlightKeys: refreshInFlightKeysRef.current, - }); - }, - ); - - const handleCommand = useCallback(async (command: string) => { - const trimmedCommand = command.trim(); - const latestTicker = tickerHistory.history[0]?.company.symbol; - const resolvedCommand = resolveTickerCommandFallback(trimmedCommand, latestTicker); - const workspaceId = tabs.activeWorkspaceId; - const currentWorkspace = tabs.workspaces.find( - (workspace) => workspace.id === workspaceId, - ); - const isSlashCommand = resolvedCommand.startsWith('/'); - - if (!resolvedCommand) { - return; - } - - setActiveView('terminal'); - - if (resolvedCommand === '/clear' || resolvedCommand.toLowerCase() === 'clear') { - clearWorkspaceSession(workspaceId); - return; - } - - pushCommandHistory(workspaceId, resolvedCommand); - setIsProcessing(true); - if (isPortfolioCommand(resolvedCommand)) { - portfolioWorkflow.noteCommandStart(workspaceId, resolvedCommand); - } - - if (isSlashCommand) { - const commandEntry = createEntry({ type: 'command', content: resolvedCommand }); - clearWorkspaceSession(workspaceId); - tabs.appendWorkspaceEntry(workspaceId, commandEntry); - - try { - const response = await terminalBridge.executeTerminalCommand({ - workspaceId, - input: resolvedCommand, - }); - - appendResolvedCommandResponse(workspaceId, resolvedCommand, response); - } catch (error) { - tabs.appendWorkspaceEntry( - workspaceId, - createEntry({ - type: 'error', - content: error instanceof Error ? error.message : 'Command execution failed.', - }), - ); - portfolioWorkflow.noteCommandError(workspaceId, resolvedCommand); - } finally { - setIsProcessing(false); - } - - return; - } - - const panelContext = extractChatPanelContext(currentWorkspace?.history ?? []); - const commandEntry = createEntry({ type: 'command', content: resolvedCommand }); - let lastSequenceSeen = 0; - let activeTextEntryId: string | null = null; - let activeThinkingEntryId: string | null = null; - - tabs.appendWorkspaceEntry(workspaceId, commandEntry); - - const appendStreamEntry = ( - type: 'response' | 'thinking', - renderMode: 'plain' | 'markdown', - ) => { - const entry = createEntry({ - type, - content: '', - renderMode, - }); - tabs.appendWorkspaceEntry(workspaceId, entry); - - if (type === 'response') { - activeTextEntryId = entry.id; - } else { - activeThinkingEntryId = entry.id; - } - - return entry.id; - }; - - const closeTextSegment = () => { - activeTextEntryId = null; - }; - - const closeThinkingSegment = () => { - activeThinkingEntryId = null; - }; - - const appendDeltaToEntry = ( - entryId: string, - delta: string, - renderMode: 'plain' | 'markdown', - ) => { - tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ - ...entry, - content: typeof entry.content === 'string' ? `${entry.content}${delta}` : delta, - renderMode, - timestamp: new Date(), - })); - }; - - const processStreamItem = ( - event: Omit & { - response?: TerminalCommandResponse; + const refreshEarningsPanels = useEffectEvent(async (event: MappingsUpdatedEvent) => { + await refreshAffectedEarningsPanels({ + workspaces: workspacesRef.current, + event, + execute: (panel) => + terminalBridge.executeTerminalCommand({ + workspaceId: panel.workspaceId, + input: panel.sourceCommand, + }), + replacePanel: (panel, response) => { + replaceEarningsPanelResponse(panel.workspaceId, panel.panelEntryId, response); }, - ) => { - if (event.sequence <= lastSequenceSeen) { + reportFailure: (panel, reason) => { + appendWorkspaceSystemMessage(panel.workspaceId, reason); + }, + inFlightKeys: refreshInFlightKeysRef.current, + }); + }); + + const handleCommand = useCallback( + async (workspaceId: string, command: string) => { + const trimmedCommand = command.trim(); + const latestTicker = tickerHistory.history[0]?.company.symbol; + const resolvedCommand = resolveTickerCommandFallback(trimmedCommand, latestTicker); + const currentWorkspace = findWorkspace(workspaceId); + const isSlashCommand = resolvedCommand.startsWith('/'); + + if (!resolvedCommand || !currentWorkspace || currentWorkspace.terminal.isProcessing) { return; } - lastSequenceSeen = event.sequence; - switch (event.kind) { - case 'reasoning_delta': { - if (!event.delta) { + setActiveView('terminal'); + + if (resolvedCommand === '/clear' || resolvedCommand.toLowerCase() === 'clear') { + clearWorkspaceSession(workspaceId); + return; + } + + pushCommandHistory(workspaceId, resolvedCommand); + tabs.setWorkspaceInputDraft(workspaceId, ''); + tabs.setWorkspacePendingApproval(workspaceId, null); + + if (isPortfolioCommand(resolvedCommand)) { + portfolioWorkflow.noteCommandStart(workspaceId, resolvedCommand); + } + + if (isSlashCommand) { + const requestId = createTerminalRequestId(); + const commandEntry = createEntry({ type: 'command', content: resolvedCommand }); + + clearWorkspaceSession(workspaceId); + tabs.appendWorkspaceEntry(workspaceId, commandEntry); + tabs.setWorkspaceProcessing(workspaceId, true); + tabs.setWorkspaceActiveRequest(workspaceId, { kind: 'slash', requestId }); + + try { + const response = await terminalBridge.executeTerminalCommand({ + workspaceId, + input: resolvedCommand, + }); + + if (!finishWorkspaceRequest(workspaceId, requestId)) { return; } - closeTextSegment(); - const entryId = - activeThinkingEntryId ?? appendStreamEntry('thinking', 'plain'); - appendDeltaToEntry(entryId, event.delta, 'plain'); - return; - } - case 'text_delta': { - if (!event.delta) { + + appendResolvedCommandResponse(workspaceId, resolvedCommand, response); + } catch (error) { + if (!finishWorkspaceRequest(workspaceId, requestId)) { return; } - closeThinkingSegment(); - const entryId = - activeTextEntryId ?? appendStreamEntry('response', 'markdown'); - appendDeltaToEntry(entryId, event.delta, 'markdown'); - return; - } - case 'tool_command': { - if (!event.command) { - return; - } - closeThinkingSegment(); - closeTextSegment(); + tabs.appendWorkspaceEntry( workspaceId, - createEntry({ type: 'command', content: event.command }), + createEntry({ + type: 'error', + content: + error instanceof Error ? error.message : COMMAND_FAILURE_MESSAGE, + }), ); - return; + portfolioWorkflow.noteCommandError(workspaceId, resolvedCommand); } - case 'tool_result': { - if (!event.command || !event.response) { - return; - } - closeThinkingSegment(); - closeTextSegment(); - appendResolvedCommandResponse(workspaceId, event.command, event.response); - return; - } - case 'approval_required': { - if (!event.approvalId || !event.command || !event.title || !event.message) { - return; - } - setPendingAgentApproval({ - approvalId: event.approvalId, - command: event.command, - requestId: event.requestId, - workspaceId: event.workspaceId, - title: event.title, - message: event.message, - }); - return; - } - case 'stream_complete': { - tabs.setWorkspaceSession(workspaceId, event.sessionId); - closeThinkingSegment(); - closeTextSegment(); - setIsProcessing(false); - return; - } - case 'error': { - const entryId = activeTextEntryId ?? appendStreamEntry('response', 'plain'); - closeThinkingSegment(); - activeTextEntryId = entryId; - tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ - ...entry, - type: 'error', - content: event.errorMessage ?? 'Chat stream failed.', - renderMode: 'plain', - timestamp: new Date(), - })); - setIsProcessing(false); - return; - } - default: - return; + + return; } - }; - try { - const start = await terminalBridge.startChatStream( - { - workspaceId, - sessionId: currentWorkspace?.chatSessionId, - prompt: resolvedCommand, - agentProfile: 'toolUse', - panelContext, - }, - { - onStreamItem: processStreamItem, - }, - ); + const requestId = createTerminalRequestId(); + const panelContext = extractChatPanelContext(currentWorkspace.terminal.history); + const commandEntry = createEntry({ type: 'command', content: resolvedCommand }); + let lastSequenceSeen = 0; + let activeTextEntryId: string | null = null; + let activeThinkingEntryId: string | null = null; - tabs.setWorkspaceSession(workspaceId, start.sessionId); - } catch (error) { - const entryId = activeTextEntryId ?? appendStreamEntry('response', 'plain'); - tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ - ...entry, - type: 'error', - content: error instanceof Error ? error.message : 'Chat stream failed.', - renderMode: 'plain', - timestamp: new Date(), - })); - setIsProcessing(false); - } - }, [ - appendResolvedCommandResponse, - clearWorkspaceSession, - portfolioWorkflow, - pushCommandHistory, - setActiveView, - tabs, - tickerHistory, - ]); + tabs.appendWorkspaceEntry(workspaceId, commandEntry); + tabs.setWorkspaceProcessing(workspaceId, true); + tabs.setWorkspaceActiveRequest(workspaceId, { kind: 'chat', requestId }); + + const appendStreamEntry = ( + type: 'response' | 'thinking', + renderMode: 'plain' | 'markdown', + ) => { + const entry = createEntry({ + type, + content: '', + renderMode, + }); + tabs.appendWorkspaceEntry(workspaceId, entry); + + if (type === 'response') { + activeTextEntryId = entry.id; + } else { + activeThinkingEntryId = entry.id; + } + + return entry.id; + }; + + const closeTextSegment = () => { + activeTextEntryId = null; + }; + + const closeThinkingSegment = () => { + activeThinkingEntryId = null; + }; + + const appendDeltaToEntry = ( + entryId: string, + delta: string, + renderMode: 'plain' | 'markdown', + ) => { + tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ + ...entry, + content: typeof entry.content === 'string' ? `${entry.content}${delta}` : delta, + renderMode, + timestamp: new Date(), + })); + }; + + const updateErrorEntry = (message: string) => { + const entryId = activeTextEntryId ?? appendStreamEntry('response', 'plain'); + closeThinkingSegment(); + activeTextEntryId = entryId; + tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ + ...entry, + type: 'error', + content: message, + renderMode: 'plain', + timestamp: new Date(), + })); + }; + + const processStreamItem = ( + event: Omit & { + response?: TerminalCommandResponse; + }, + ) => { + if (!isActiveRequest(workspaceId, requestId) || event.sequence <= lastSequenceSeen) { + return; + } + + lastSequenceSeen = event.sequence; + + switch (event.kind) { + case 'reasoning_delta': { + if (!event.delta) { + return; + } + closeTextSegment(); + const entryId = + activeThinkingEntryId ?? appendStreamEntry('thinking', 'plain'); + appendDeltaToEntry(entryId, event.delta, 'plain'); + return; + } + case 'text_delta': { + if (!event.delta) { + return; + } + closeThinkingSegment(); + const entryId = + activeTextEntryId ?? appendStreamEntry('response', 'markdown'); + appendDeltaToEntry(entryId, event.delta, 'markdown'); + return; + } + case 'tool_command': { + if (!event.command) { + return; + } + closeThinkingSegment(); + closeTextSegment(); + tabs.appendWorkspaceEntry( + workspaceId, + createEntry({ type: 'command', content: event.command }), + ); + return; + } + case 'tool_result': { + if (!event.command || !event.response) { + return; + } + closeThinkingSegment(); + closeTextSegment(); + appendResolvedCommandResponse(workspaceId, event.command, event.response); + return; + } + case 'approval_required': { + if (!event.approvalId || !event.command || !event.title || !event.message) { + return; + } + + const approval: PendingAgentApproval = { + approvalId: event.approvalId, + command: event.command, + requestId: event.requestId, + workspaceId: event.workspaceId, + title: event.title, + message: event.message, + }; + tabs.setWorkspacePendingApproval(workspaceId, approval); + return; + } + case 'stream_complete': { + tabs.setWorkspaceSession(workspaceId, event.sessionId); + closeThinkingSegment(); + closeTextSegment(); + finishWorkspaceRequest(workspaceId, requestId); + return; + } + case 'cancelled': { + closeThinkingSegment(); + closeTextSegment(); + if (finishWorkspaceRequest(workspaceId, requestId)) { + appendWorkspaceSystemMessage(workspaceId, CHAT_CANCELLED_MESSAGE); + } + return; + } + case 'error': { + updateErrorEntry(event.errorMessage ?? CHAT_FAILURE_MESSAGE); + finishWorkspaceRequest(workspaceId, requestId); + return; + } + default: + return; + } + }; + + try { + const start = await terminalBridge.startChatStream( + { + requestId, + workspaceId, + sessionId: currentWorkspace.terminal.chatSessionId, + prompt: resolvedCommand, + agentProfile: 'toolUse', + panelContext, + }, + { + onStreamItem: processStreamItem, + }, + ); + + if (isActiveRequest(workspaceId, requestId)) { + tabs.setWorkspaceSession(workspaceId, start.sessionId); + } + } catch (error) { + if (!isActiveRequest(workspaceId, requestId)) { + return; + } + + updateErrorEntry(error instanceof Error ? error.message : CHAT_FAILURE_MESSAGE); + finishWorkspaceRequest(workspaceId, requestId); + } + }, + [ + appendResolvedCommandResponse, + appendWorkspaceSystemMessage, + clearWorkspaceSession, + findWorkspace, + finishWorkspaceRequest, + isActiveRequest, + portfolioWorkflow, + pushCommandHistory, + setActiveView, + tabs, + tickerHistory, + ], + ); useEffect(() => { tabs.workspaces.forEach((workspace) => { @@ -384,17 +463,19 @@ export const useTerminalOrchestrator = ({ let disposed = false; let unlisten: (() => void) | undefined; - void mappingsBridge.listenForUpdates((payload) => { - if (!disposed) { - void refreshEarningsPanels(payload); - } - }).then((listener) => { - if (disposed) { - listener(); - return; - } - unlisten = listener; - }); + void mappingsBridge + .listenForUpdates((payload) => { + if (!disposed) { + void refreshEarningsPanels(payload); + } + }) + .then((listener) => { + if (disposed) { + listener(); + return; + } + unlisten = listener; + }); return () => { disposed = true; @@ -402,26 +483,58 @@ export const useTerminalOrchestrator = ({ }; }, [refreshEarningsPanels]); - const runCommand = useCallback((command: string) => { - void handleCommand(command); - }, [handleCommand]); + const runCommand = useCallback( + (workspaceId: string, command: string) => { + void handleCommand(workspaceId, command); + }, + [handleCommand], + ); const resetCommandIndex = useCallback(() => { commandIndexRefs.current[tabs.activeWorkspaceId] = -1; }, [tabs.activeWorkspaceId]); - const resolvePendingAgentApproval = useCallback((approved: boolean) => { - if (!pendingAgentApproval) { - return; - } + const resolvePendingAgentApproval = useCallback( + (workspaceId: string, approved: boolean) => { + const workspace = findWorkspace(workspaceId); + const pendingApproval = workspace?.terminal.pendingApproval; + if (!pendingApproval) { + return; + } - void terminalBridge - .resolveAgentToolApproval({ - approvalId: pendingAgentApproval.approvalId, - approved, - }) - .finally(() => setPendingAgentApproval(null)); - }, [pendingAgentApproval]); + void terminalBridge + .resolveAgentToolApproval({ + approvalId: pendingApproval.approvalId, + approved, + }) + .finally(() => { + const latestWorkspace = findWorkspace(workspaceId); + if ( + latestWorkspace?.terminal.pendingApproval?.approvalId === + pendingApproval.approvalId + ) { + tabs.setWorkspacePendingApproval(workspaceId, null); + } + }); + }, + [findWorkspace, tabs], + ); + + const cancelWorkspaceRequest = useCallback( + async (workspaceId: string) => { + const workspace = findWorkspace(workspaceId); + const activeRequest = workspace?.terminal.activeRequest; + if (!activeRequest || activeRequest.kind !== 'chat') { + return; + } + + await terminalBridge.cancelChatStream({ + workspaceId, + requestId: activeRequest.requestId, + }); + }, + [findWorkspace], + ); const handleStartPortfolioAction = useCallback( (action: PortfolioAction, seed?: PortfolioActionSeed) => { @@ -443,14 +556,13 @@ export const useTerminalOrchestrator = ({ }, [portfolioWorkflow, tabs.activeWorkspaceId]); return { + cancelWorkspaceRequest, commandInputRef, clearWorkspaceSession, handleClearPortfolioAction, handleCommand, handleStartPortfolioAction, handleUpdatePortfolioDraft, - isProcessing, - pendingAgentApproval, resetCommandIndex, resolvePendingAgentApproval, runCommand, diff --git a/MosaicIQ/src/lib/mappingsRefresh.test.ts b/MosaicIQ/src/lib/mappingsRefresh.test.ts index bb492e5..96bb728 100644 --- a/MosaicIQ/src/lib/mappingsRefresh.test.ts +++ b/MosaicIQ/src/lib/mappingsRefresh.test.ts @@ -8,7 +8,21 @@ import { import type { Workspace } from '../hooks/useTabs'; import type { EarningsPanelData } from '../types/financial'; import type { MappingsUpdatedEvent } from '../types/mappings'; -import type { TerminalCommandResponse } from '../types/terminal'; +import type { TerminalCommandResponse, WorkspaceTerminalState } from '../types/terminal'; + +const makeTerminalState = ( + history: WorkspaceTerminalState['history'], +): WorkspaceTerminalState => ({ + history, + chatSessionId: undefined, + inputDraft: '', + isProcessing: false, + activeRequest: null, + pendingApproval: null, + scrollTop: 0, + stickToBottom: true, + collapsedThinkingEntryIds: [], +}); const makeEarningsPanel = ( symbol: string, @@ -78,7 +92,7 @@ const makeWorkspace = ( id, name: id, createdAt: new Date('2026-04-13T10:00:00Z'), - history: [ + terminal: makeTerminalState([ { id: `${id}-command`, type: 'command', @@ -91,7 +105,7 @@ const makeWorkspace = ( content: { type: 'earnings', data: panel }, timestamp: new Date('2026-04-13T10:00:01Z'), }, - ], + ]), }); const makeEvent = ( @@ -122,7 +136,7 @@ describe('mappings refresh targeting', () => { id: 'ws-mixed', name: 'ws-mixed', createdAt: new Date('2026-04-13T10:00:00Z'), - history: [ + terminal: makeTerminalState([ { id: 'cmd-em', type: 'command', @@ -157,7 +171,7 @@ describe('mappings refresh targeting', () => { }, timestamp: new Date('2026-04-13T11:00:01Z'), }, - ], + ]), }; const panel = getRefreshableEarningsPanel(workspace); @@ -173,7 +187,7 @@ describe('mappings refresh targeting', () => { id: 'ws-no-earnings', name: 'ws-no-earnings', createdAt: new Date('2026-04-13T10:00:00Z'), - history: [ + terminal: makeTerminalState([ { id: 'cmd-company', type: 'command', @@ -196,7 +210,7 @@ describe('mappings refresh targeting', () => { }, timestamp: new Date('2026-04-13T10:00:01Z'), }, - ], + ]), }; const panel = getRefreshableEarningsPanel(workspace); diff --git a/MosaicIQ/src/lib/mappingsRefresh.ts b/MosaicIQ/src/lib/mappingsRefresh.ts index 247aa0e..0a452a8 100644 --- a/MosaicIQ/src/lib/mappingsRefresh.ts +++ b/MosaicIQ/src/lib/mappingsRefresh.ts @@ -32,8 +32,8 @@ interface RefreshAffectedEarningsPanelsArgs { export const getRefreshableEarningsPanel = ( workspace: Workspace, ): RefreshableEarningsPanel | null => { - for (let index = workspace.history.length - 1; index >= 0; index -= 1) { - const entry = workspace.history[index]; + for (let index = workspace.terminal.history.length - 1; index >= 0; index -= 1) { + const entry = workspace.terminal.history[index]; if (entry.type !== 'panel' || typeof entry.content === 'string') { continue; } @@ -174,7 +174,7 @@ const findSourceCommand = ( panelIndex: number, ): string | null => { for (let index = panelIndex - 1; index >= 0; index -= 1) { - const candidate = workspace.history[index]; + const candidate = workspace.terminal.history[index]; if ( candidate.type === 'command' && typeof candidate.content === 'string' && diff --git a/MosaicIQ/src/lib/terminalBridge.ts b/MosaicIQ/src/lib/terminalBridge.ts index 12d7157..124d3e0 100644 --- a/MosaicIQ/src/lib/terminalBridge.ts +++ b/MosaicIQ/src/lib/terminalBridge.ts @@ -2,6 +2,7 @@ import { invoke } from '@tauri-apps/api/core'; import { listen, type UnlistenFn } from '@tauri-apps/api/event'; import { AgentStreamItemEvent, + CancelChatStreamRequest, ChatStreamStart, LookupCompanyRequest, ExecuteTerminalCommandRequest, @@ -18,7 +19,7 @@ interface StreamCallbacks { }) => void; } -const createRequestId = (): string => { +export const createTerminalRequestId = (): string => { if (typeof crypto !== 'undefined' && typeof crypto.randomUUID === 'function') { return crypto.randomUUID(); } @@ -48,6 +49,7 @@ class TerminalBridge { }); if ( event.payload.kind === 'stream_complete' || + event.payload.kind === 'cancelled' || event.payload.kind === 'error' ) { this.streamCallbacks.delete(event.payload.requestId); @@ -74,53 +76,40 @@ class TerminalBridge { }); } - // CRITICAL ISSUE CONFIRMED: Chat streaming relies on timing workaround instead of explicit protocol - // - // PROBLEM: The frontend registers callbacks AFTER start_chat_stream returns, creating a race condition - // where early stream events can be lost. The backend compensates with a 30ms sleep (see terminal.rs:70), - // but this is brittle and can still fail under load or slower UI conditions. - // - // IMPACT: - // - Data loss: Early stream events may be dropped - // - Non-deterministic: Race window varies with system conditions - // - Not testable: Race conditions are hard to reproduce in tests - // - // RECOMMENDED FIX: Implement explicit handshake: - // 1. Frontend registers callbacks before invoking start_chat_stream - // 2. Backend waits for "ready" signal before streaming - // 3. Or pass callbacks in request payload async startChatStream( - request: Omit, + request: StartChatStreamRequest, callbacks: Omit, ): Promise { await this.ensureListeners(); - const requestId = createRequestId(); - this.streamCallbacks.set(requestId, { + this.streamCallbacks.set(request.requestId, { workspaceId: request.workspaceId, ...callbacks, }); try { const start = await invoke('start_chat_stream', { - request: { - ...request, - requestId, - }, + request, }); - if (start.requestId !== requestId) { - this.streamCallbacks.delete(requestId); + if (start.requestId !== request.requestId) { + this.streamCallbacks.delete(request.requestId); throw new Error('Stream request ID mismatch.'); } return start; } catch (error) { - this.streamCallbacks.delete(requestId); + this.streamCallbacks.delete(request.requestId); throw error; } } + async cancelChatStream(request: CancelChatStreamRequest): Promise { + await invoke('cancel_chat_stream', { + request, + }); + } + async resolveAgentToolApproval( request: ResolveAgentToolApprovalRequest, ): Promise { diff --git a/MosaicIQ/src/types/terminal.ts b/MosaicIQ/src/types/terminal.ts index c283028..a5a63ad 100644 --- a/MosaicIQ/src/types/terminal.ts +++ b/MosaicIQ/src/types/terminal.ts @@ -36,6 +36,11 @@ export interface ExecuteTerminalCommandRequest { input: string; } +export interface CancelChatStreamRequest { + workspaceId: string; + requestId: string; +} + export interface LookupCompanyRequest { symbol: string; } @@ -60,6 +65,20 @@ export interface ResolveAgentToolApprovalRequest { approved: boolean; } +export interface PendingAgentApproval { + approvalId: string; + command: string; + requestId: string; + workspaceId: string; + title: string; + message: string; +} + +export interface TerminalRequestState { + kind: 'slash' | 'chat'; + requestId: string; +} + export type AgentStreamItemKind = | 'reasoning_delta' | 'text_delta' @@ -67,6 +86,7 @@ export type AgentStreamItemKind = | 'tool_result' | 'approval_required' | 'stream_complete' + | 'cancelled' | 'error'; export interface AgentStreamItemEvent { @@ -148,6 +168,18 @@ export interface TerminalState { isProcessing: boolean; } +export interface WorkspaceTerminalState { + history: TerminalEntry[]; + chatSessionId?: string; + inputDraft: string; + isProcessing: boolean; + activeRequest: TerminalRequestState | null; + pendingApproval: PendingAgentApproval | null; + scrollTop: number; + stickToBottom: boolean; + collapsedThinkingEntryIds: string[]; +} + export interface TickerHistorySnapshot { symbol: string; name: string; diff --git a/mosaiciq-performance-audit-optimization-plan.md b/mosaiciq-performance-audit-optimization-plan.md deleted file mode 100644 index a08a3b7..0000000 --- a/mosaiciq-performance-audit-optimization-plan.md +++ /dev/null @@ -1,438 +0,0 @@ -# MosaicIQ Performance Audit & Optimization Plan - -## Scope - -This revision is based on the current code in `MosaicIQ/` and a fresh production build. - -Verified baseline: - -- The main Vite entry chunk is still large: `dist/assets/index-RcdHjVMm.js` is `825.72 kB` minified and `246.79 kB` gzipped. -- The app already lazy-loads `ResearchMode`, `SettingsPage`, and `ResearchGraph`. -- The research backend opens a new SQLite connection inside `spawn_blocking` for every repository call. -- The research UI reloads the full workspace projection after note, ghost, and workspace events. - -## Corrected Executive Summary - -The biggest problems are not “lack of React.memo” or “missing async SQLite” in the abstract. The real bottlenecks are: - -1. `research` and `news` repository calls pay per-call SQLite connection setup cost and serialize a lot of work through `spawn_blocking`. -2. Workspace reads are duplicated and amplified: the backend assembles projections sequentially, and the frontend re-fetches the entire projection after many small events. -3. Background job processing can race. The scheduler loop and `kick_job_processor()` both call `process_due_jobs()`, but claiming work is not atomic. -4. Several hot paths still do N+1 work: `list_sources_by_ids`, note-save loops after link inference, and repeated audit trail fetches for the same selection. -5. Bundle size is a real issue, but the fix is targeted dependency splitting and measurement, not generic `manualChunks` by component file. - -## P0: Fix Correctness + High-Leverage Latency First - -### 1. Make job claiming atomic before adding concurrency - -Why this is first: - -- `spawn_research_scheduler()` runs every 3 seconds. -- `kick_job_processor()` also spawns ad hoc processors on note capture, note update, and retry. -- `process_due_jobs()` reads due jobs, then marks them running in a separate step. - -That means two runners can observe the same queued job and both process it. - -Files: - -- `src-tauri/src/research/pipeline.rs` -- `src-tauri/src/research/service.rs` -- `src-tauri/src/research/repository.rs` - -Evidence: - -- `spawn_research_scheduler()` loops forever and calls `service.process_due_jobs().await`. -- `kick_job_processor()` also spawns `process_due_jobs()`. -- `list_due_jobs()` and `mark_running()` are separate operations. - -Recommended fix: - -1. Replace “list then mark” with a single transactional claim method in the repository. -2. Ensure only one background processor is active at a time, or gate concurrent processors with a mutex/semaphore. -3. Only consider parallel per-job execution after claim semantics are safe. - -Implementation sketch: - -```rust -pub async fn claim_due_jobs(&self, limit: usize) -> Result> { - self.with_connection(move |connection| { - let tx = connection.transaction()?; - - let jobs = { - let mut stmt = tx.prepare( - "SELECT entity_json - FROM pipeline_jobs - WHERE status IN (?1, ?2) - AND (next_attempt_at IS NULL OR next_attempt_at <= ?3) - ORDER BY updated_at ASC - LIMIT ?4" - )?; - // read rows here - }; - - for job in &jobs { - tx.execute( - "UPDATE pipeline_jobs - SET status = ?2, updated_at = ?3, entity_json = ?4 - WHERE id = ?1 AND status IN (?5, ?6)", - params![/* updated running job */], - )?; - } - - tx.commit()?; - Ok(jobs) - }).await -} -``` - -Success criteria: - -- A job ID is never processed twice in logs for one enqueue. -- `kick_job_processor()` no longer creates overlapping workers. - -### 2. Parallelize projection assembly and audit-trail reads - -Why this matters: - -- `get_workspace_projection()` currently does four independent repository reads sequentially. -- `get_note_audit_trail()` also does a sequence of independent reads and then calls an N+1 helper for sources. - -Files: - -- `src-tauri/src/research/service.rs` - -Recommended fix: - -1. Use `tokio::try_join!` in `get_workspace_projection()`. -2. Use `tokio::try_join!` in `get_note_audit_trail()` for links, ghosts, and audit events after loading the note. -3. Deduplicate source IDs before querying sources. - -Implementation sketch: - -```rust -let workspace_fut = self.repository.get_workspace(&request.workspace_id); -let notes_fut = self.repository.list_notes(&request.workspace_id, false, None); -let links_fut = self.repository.list_links(&request.workspace_id, None); -let ghosts_fut = self.repository.list_ghosts(&request.workspace_id, false); - -let (workspace, notes, links, ghosts) = - tokio::try_join!(workspace_fut, notes_fut, links_fut, ghosts_fut)?; -``` - -Success criteria: - -- `get_workspace_projection` latency drops materially under tracing. -- `get_note_audit_trail` no longer performs serial backend waits for independent reads. - -### 3. Stop reloading the full workspace projection on every small event - -Why this matters: - -- `useResearchProjection` schedules a full `getWorkspaceProjection` refetch on workspace, note, and ghost updates. -- Background jobs emit note and ghost updates, so one user action can trigger repeated full projection reloads. - -Files: - -- `src/hooks/useResearchProjection.ts` -- `src/components/Research/ResearchMode.tsx` -- `src/components/Research/ResearchInspector.tsx` - -Recommended fix: - -1. Patch local projection state from event payloads where possible instead of re-fetching the whole projection. -2. Keep full reloads for coarse invalidation only. -3. Share one audit-trail fetch path. Right now both `ResearchMode` and `ResearchInspector` fetch `getNoteAuditTrail()` for the selected note. - -Notes: - -- `ResearchMode` fetches audit trail on selection change. -- `ResearchInspector` fetches the same audit trail again when `note` changes. - -Success criteria: - -- Selecting a note results in one audit-trail request, not two. -- Background enrichment/linking no longer causes repeated full projection fetches for the same workspace state. - -### 4. Batch link-inference writes and avoid unnecessary rewrites - -Why this matters: - -- `process_infer_links()` recalculates links, replaces all links, then re-saves every note in the workspace one by one. -- This is expensive and also creates extra downstream event churn. - -Files: - -- `src-tauri/src/research/service.rs` -- `src-tauri/src/research/repository.rs` - -Recommended fix: - -1. Add a transactional `save_notes_batch`. -2. Only persist notes whose inferred-link set actually changed. -3. Consider diffing links before `replace_links_for_workspace()` to avoid full delete/reinsert when unchanged. - -Success criteria: - -- Large workspaces no longer perform `N` separate note saves after every link inference pass. -- No-op link inference produces minimal writes. - -## P1: Reduce Database Overhead Without Prematurely Rewriting the Stack - -### 5. Reuse SQLite connections instead of opening one per repository call - -What is true: - -- `with_connection()` in both repositories uses `spawn_blocking` and opens a fresh SQLite connection every time. - -What is not yet justified: - -- A full migration to `sqlx` should not be the first recommendation. SQLite “async” drivers still use background threads internally, and the current biggest cost is repeated connection setup plus query shape, not just the driver choice. - -Files: - -- `src-tauri/src/research/repository.rs` -- `src-tauri/src/news/repository.rs` - -Recommended fix order: - -1. Keep `rusqlite` initially. -2. Introduce a small connection pool or a dedicated DB worker with persistent connections. -3. Re-measure before considering a driver migration. - -Candidate approaches: - -- `r2d2_sqlite` -- `deadpool-sqlite` -- one long-lived DB thread per subsystem if contention remains low - -Success criteria: - -- Repository calls no longer pay `Connection::open()` and PRAGMA setup on every operation. - -### 6. Replace `list_sources_by_ids` N+1 lookup with a single query - -Why this matters: - -- `list_sources_by_ids()` loops over IDs and executes one query per source. -- `get_note_audit_trail()` and `process_refresh_source()` both depend on it. - -Files: - -- `src-tauri/src/research/repository.rs` - -Recommended fix: - -1. Deduplicate incoming IDs. -2. Build a single `IN (?, ?, ...)` query. -3. Preserve input order in memory if the caller depends on it. - -Success criteria: - -- Source lookup for audit trails becomes one DB round-trip instead of many. - -### 7. Add the indexes that are actually missing - -The draft overstated this area. The repository already creates several useful indexes for notes, links, ghosts, sources, and jobs. - -Real candidates: - -1. `audit_events(entity_id, created_at)` for note audit trails. -2. `audit_events(workspace_id, created_at)` for bundle export. -3. `research_notes(workspace_id, source_id, note_type)` for `find_source_reference_note()`. -4. Source checksum/accession should not rely on unindexed `json_extract(entity_json, ...)` lookups on a hot path. - -Files: - -- `src-tauri/src/research/repository.rs` - -Recommended fix: - -1. Add the missing audit and source-reference indexes. -2. Promote checksum and filing accession into indexed columns, or add generated columns if that fits the migration strategy. - -Success criteria: - -- Query plans for audit-trail and source-dedup queries stop full-scanning growing tables. - -### 8. Normalize time comparisons for due jobs - -Why this matters: - -- `next_attempt_at` is written as RFC 3339 text. -- `list_due_jobs()` compares it to `datetime('now')`. - -That string comparison is fragile because the formats do not match exactly. - -Files: - -- `src-tauri/src/research/repository.rs` -- `src-tauri/src/research/util.rs` - -Recommended fix: - -1. Store retry timestamps as integer epoch seconds, or -2. Store them in a SQLite-compatible normalized format consistently. - -Success criteria: - -- Retry timing is deterministic and easy to index. - -## P2: Frontend and Bundle Work Based On Measured Hotspots - -### 9. Shrink the main bundle with dependency-level splitting - -What is true: - -- The main entry chunk is still `825.72 kB`. -- That is large enough to justify work. - -What is already done: - -- `App.tsx` already lazy-loads `ResearchMode` and `SettingsPage`. -- `ResearchMode.tsx` already lazy-loads `ResearchGraph`. - -What needs correction: - -- `manualChunks` should split heavy dependencies, not component source paths. -- Do bundle analysis first so chunk rules map to real weight. - -Files: - -- `vite.config.ts` -- `src/App.tsx` -- heavy feature entry points in `src/components` - -Recommended fix: - -1. Add a bundle analyzer for one pass. -2. Split by heavy libraries that dominate the main chunk. -3. Lazy-load rarely used panels or dependencies if they currently land in `index`. - -Success criteria: - -- The main `index-*.js` chunk is materially smaller after analysis-driven chunking. - -### 10. Treat `React.memo` as a profiling tool, not a blanket rule - -The draft was too broad here. - -Current state: - -- There is already lazy loading and targeted `useMemo`/`useDeferredValue` usage. -- There is no evidence yet that “all components need `React.memo`”. - -Recommended fix: - -1. Profile note-heavy views first. -2. Memoize only components that are proven hot and receive stable props. -3. Prefer fixing projection refresh storms and duplicate fetches before adding memo wrappers. - -Possible targets after profiling: - -- `NoteCard` -- `GhostCard` -- large list/board containers - -### 11. Do not treat file length as a performance bug by itself - -Large files such as `CommandInput.tsx` and `ResearchMode.tsx` are maintainability concerns, but splitting them does not automatically improve runtime performance. - -Recommended posture: - -- Refactor them when it enables real wins such as better lazy-loading boundaries, simpler state ownership, or isolated expensive subtrees. - -### 12. Defer virtualization until real note-count profiling justifies it - -Virtualized lists may become necessary for very large workspaces, but current higher-order bottlenecks are above them: - -- full projection refetches -- duplicate audit fetches -- batch-save inefficiencies -- non-atomic background job processing - -Add virtualization only after measuring large note canvases/boards. - -## Items Removed Or Downgraded From The Original Draft - -These were either incorrect, overstated, or not yet justified: - -- “No React.memo on components” as a top-level diagnosis. -- “Switch to sqlx” as the default first fix. -- Blanket entity caching in the repository. -- “Missing indexes” as a generic claim. -- “Excessive startTransition” as a meaningful bottleneck without profiling. -- “Potential memory leaks in Arc references” based on a file path that does not exist in this repo. -- Redis, memcached, or GraphQL recommendations for a local-first desktop app. - -## Recommended Execution Order - -### Phase 1: Stabilize background processing and remove duplicate work - -1. Atomic job claim + single-runner guard. -2. Parallelize `get_workspace_projection()`. -3. Parallelize `get_note_audit_trail()` and replace `list_sources_by_ids()` N+1. -4. Remove duplicate audit-trail fetches between `ResearchMode` and `ResearchInspector`. -5. Reduce full projection refetches from research events. - -Expected impact: - -- Better correctness. -- Faster workspace and inspector loads. -- Less UI thrash during background processing. - -### Phase 2: Reduce SQLite overhead and write amplification - -1. Introduce persistent SQLite connections. -2. Batch note saves after link inference. -3. Avoid rewriting unchanged inferred links/notes. -4. Add missing indexes and normalize due-job timestamp storage. - -Expected impact: - -- Lower latency under repeated research activity. -- Better scalability as workspaces grow. - -### Phase 3: Bundle and rendering cleanup - -1. Analyze the main chunk. -2. Split heavy dependencies intentionally. -3. Memoize only proven hot components. -4. Consider virtualization only if measured note counts require it. - -Expected impact: - -- Faster startup. -- Less JS to parse on initial load. - -## Verification Plan - -Add measurement before and after each phase. - -Backend: - -- Log timing for `get_workspace_projection`, `get_note_audit_trail`, `process_due_jobs`, and link inference. -- Log claimed job IDs so duplicate processing is visible immediately. -- Use `EXPLAIN QUERY PLAN` for audit-event and source-dedup queries after schema changes. - -Frontend: - -- Count `getWorkspaceProjection` and `getNoteAuditTrail` invocations during a single note capture flow. -- Use React Profiler on research views before adding memoization. -- Keep bundle-size snapshots from `npm run build`. - -Success targets: - -- One audit-trail request per note selection. -- No duplicate job processing. -- Meaningfully fewer projection refetches during enrichment/linking. -- Smaller main bundle than the current `825.72 kB`. - -## Short Version - -If only a few changes get done, do these first: - -1. Make job claiming atomic and prevent overlapping processors. -2. Parallelize projection and audit-trail reads. -3. Stop full projection refetches and duplicate audit-trail fetches in the research UI. -4. Batch note writes after link inference. -5. Reuse SQLite connections before considering a driver migration.