diff --git a/MosaicIQ/src-tauri/src/agent/gateway.rs b/MosaicIQ/src-tauri/src/agent/gateway.rs index e04de19..dd6b85f 100644 --- a/MosaicIQ/src-tauri/src/agent/gateway.rs +++ b/MosaicIQ/src-tauri/src/agent/gateway.rs @@ -1,7 +1,6 @@ -use std::pin::Pin; use std::sync::Arc; -use futures::{future::BoxFuture, Stream, StreamExt}; +use futures::{future::BoxFuture, StreamExt}; use rig::{ agent::MultiTurnStreamItem, client::completion::CompletionClient, @@ -10,9 +9,8 @@ use rig::{ providers::openai, streaming::{StreamedAssistantContent, StreamingPrompt}, }; -use tauri::{AppHandle, Wry}; -use tokio::sync::mpsc; +use crate::agent::stream_events::AgentStreamEmitter; use crate::agent::tools::terminal_command::{AgentCommandExecutor, RunTerminalCommandTool}; use crate::agent::AgentRuntimeConfig; use crate::error::AppError; @@ -21,17 +19,12 @@ use crate::state::PendingAgentToolApprovals; const SYSTEM_PROMPT: &str = "You are MosaicIQ's terminal chat assistant. Answer concisely in plain text. Use the available terminal command tool whenever current workspace data or live MosaicIQ terminal actions would improve the answer. Never claim to have run a command unless the tool actually ran it. If the request is unclear, ask a short clarifying question."; const MAX_TOOL_TURNS: usize = 4; -/// Streaming text output from the upstream chat provider. -pub type ChatGatewayStream = Pin> + Send>>; - #[derive(Clone)] pub struct AgentToolRuntimeContext { - pub app_handle: AppHandle, + pub stream_emitter: Arc>, pub command_executor: Arc, pub pending_approvals: Arc, pub workspace_id: String, - pub request_id: String, - pub session_id: String, } /// Trait used by the agent service so tests can inject a deterministic gateway. @@ -44,7 +37,7 @@ pub trait ChatGateway: Clone + Send + Sync + 'static { context_messages: Vec, history: Vec, tool_runtime: AgentToolRuntimeContext, - ) -> BoxFuture<'static, Result>; + ) -> BoxFuture<'static, Result>; } /// Production Rig-backed gateway using the OpenAI-compatible chat completions API. @@ -59,7 +52,7 @@ impl ChatGateway for RigChatGateway { context_messages: Vec, history: Vec, tool_runtime: AgentToolRuntimeContext, - ) -> BoxFuture<'static, Result> { + ) -> BoxFuture<'static, Result> { Box::pin(async move { let api_key = runtime.api_key.unwrap_or_default(); let client = openai::CompletionsClient::builder() @@ -70,12 +63,10 @@ impl ChatGateway for RigChatGateway { let history = compose_request_messages(context_messages, history); let tool = RunTerminalCommandTool { - app_handle: tool_runtime.app_handle, + stream_emitter: tool_runtime.stream_emitter.clone(), command_executor: tool_runtime.command_executor, pending_approvals: tool_runtime.pending_approvals, workspace_id: tool_runtime.workspace_id, - request_id: tool_runtime.request_id, - session_id: tool_runtime.session_id, }; let mut rig_stream = client @@ -91,41 +82,60 @@ impl ChatGateway for RigChatGateway { .multi_turn(MAX_TOOL_TURNS) .await; - let (sender, receiver) = mpsc::unbounded_channel::>(); - tauri::async_runtime::spawn(async move { - let mut saw_text = false; + let mut reply = String::new(); + let mut saw_text = false; + let mut saw_reasoning_delta = false; - while let Some(item) = rig_stream.next().await { - match item { - Ok(MultiTurnStreamItem::StreamAssistantItem( - StreamedAssistantContent::Text(text), - )) => { - saw_text = true; - if sender.send(Ok(text.text)).is_err() { - return; - } + while let Some(item) = rig_stream.next().await { + match item { + Ok(MultiTurnStreamItem::StreamAssistantItem( + StreamedAssistantContent::Text(text), + )) => { + saw_text = true; + reply.push_str(&text.text); + tool_runtime.stream_emitter.text_delta(text.text)?; + } + Ok(MultiTurnStreamItem::StreamAssistantItem( + StreamedAssistantContent::Reasoning(reasoning), + )) => { + if saw_reasoning_delta { + continue; } - Ok(MultiTurnStreamItem::FinalResponse(final_response)) => { - if !saw_text && !final_response.response().is_empty() { - let _ = sender.send(Ok(final_response.response().to_string())); - } - return; + + let text = reasoning_text(&reasoning); + if text.is_empty() { + continue; } - Ok(_) => {} - Err(error) => { - let _ = sender.send(Err(map_streaming_error(error))); - return; + + tool_runtime.stream_emitter.reasoning_delta(text)?; + } + Ok(MultiTurnStreamItem::StreamAssistantItem( + StreamedAssistantContent::ReasoningDelta { reasoning, .. }, + )) => { + saw_reasoning_delta = true; + tool_runtime.stream_emitter.reasoning_delta(reasoning)?; + } + Ok(MultiTurnStreamItem::StreamAssistantItem( + StreamedAssistantContent::ToolCall { .. }, + )) => {} + Ok(MultiTurnStreamItem::StreamAssistantItem( + StreamedAssistantContent::ToolCallDelta { .. }, + )) => {} + Ok(MultiTurnStreamItem::StreamUserItem(_)) => {} + Ok(MultiTurnStreamItem::FinalResponse(final_response)) => { + if !saw_text && !final_response.response().is_empty() { + reply.push_str(final_response.response()); + tool_runtime + .stream_emitter + .text_delta(final_response.response().to_string())?; } } + Ok(_) => {} + Err(error) => return Err(map_streaming_error(error)), } - }); + } - let stream = futures::stream::unfold(receiver, |mut receiver| async move { - receiver.recv().await.map(|item| (item, receiver)) - }); - - let stream: ChatGatewayStream = Box::pin(stream); - Ok(stream) + Ok(reply) }) } } @@ -137,6 +147,22 @@ fn compose_request_messages( context_messages.into_iter().chain(history).collect() } +fn reasoning_text(reasoning: &rig::message::Reasoning) -> String { + use rig::message::ReasoningContent; + + reasoning + .content + .iter() + .filter_map(|block| match block { + ReasoningContent::Text { text, .. } => Some(text.as_str()), + ReasoningContent::Summary(text) => Some(text.as_str()), + ReasoningContent::Encrypted(_) | ReasoningContent::Redacted { .. } => None, + _ => None, + }) + .collect::>() + .join("\n") +} + fn map_streaming_error(error: rig::agent::StreamingError) -> AppError { AppError::ProviderRequest(error.to_string()) } diff --git a/MosaicIQ/src-tauri/src/agent/mod.rs b/MosaicIQ/src-tauri/src/agent/mod.rs index 97c437a..7330712 100644 --- a/MosaicIQ/src-tauri/src/agent/mod.rs +++ b/MosaicIQ/src-tauri/src/agent/mod.rs @@ -5,17 +5,18 @@ mod panel_context; mod routing; mod service; mod settings; +mod stream_events; mod tools; mod types; pub use gateway::{AgentToolRuntimeContext, ChatGateway, RigChatGateway}; pub use service::AgentService; pub(crate) use settings::AgentSettingsService; +pub use stream_events::AgentStreamEmitter; pub use types::{ - default_task_defaults, AgentConfigStatus, AgentDeltaEvent, AgentErrorEvent, AgentResultEvent, - AgentRuntimeConfig, AgentStoredSettings, AgentTaskRoute, AgentToolApprovalRequiredEvent, - AgentToolCommandEvent, AgentToolResultEvent, ChatPanelContext, ChatPromptRequest, - ChatStreamStart, PreparedChatTurn, RemoteProviderSettings, + default_task_defaults, AgentConfigStatus, AgentRuntimeConfig, AgentStoredSettings, + AgentStreamItemEvent, AgentStreamItemKind, AgentTaskRoute, ChatPanelContext, + ChatPromptRequest, ChatStreamStart, PreparedChatTurn, RemoteProviderSettings, ResolveAgentToolApprovalRequest, SaveAgentRuntimeConfigRequest, TaskProfile, UpdateRemoteApiKeyRequest, AGENT_SETTINGS_STORE_PATH, DEFAULT_REMOTE_BASE_URL, DEFAULT_REMOTE_MODEL, diff --git a/MosaicIQ/src-tauri/src/agent/stream_events.rs b/MosaicIQ/src-tauri/src/agent/stream_events.rs new file mode 100644 index 0000000..86f65ea --- /dev/null +++ b/MosaicIQ/src-tauri/src/agent/stream_events.rs @@ -0,0 +1,143 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use tauri::{AppHandle, Emitter, Runtime}; + +use crate::agent::{AgentStreamItemEvent, AgentStreamItemKind}; +use crate::error::AppError; +use crate::terminal::TerminalCommandResponse; + +pub struct AgentStreamEmitter { + app_handle: AppHandle, + workspace_id: String, + request_id: String, + session_id: String, + next_sequence: AtomicU64, +} + +impl AgentStreamEmitter { + pub fn new( + app_handle: AppHandle, + workspace_id: String, + request_id: String, + session_id: String, + ) -> Self { + Self { + app_handle, + workspace_id, + request_id, + session_id, + next_sequence: AtomicU64::new(1), + } + } + + pub fn emit(&self, event: AgentStreamItemEvent) -> Result<(), AppError> { + self.app_handle + .emit("agent_stream_item", event) + .map_err(|error| AppError::ProviderRequest(error.to_string())) + } + + pub fn reasoning_delta(&self, delta: String) -> Result<(), AppError> { + if delta.is_empty() { + return Ok(()); + } + + self.emit(self.event(AgentStreamItemKind::ReasoningDelta).with_delta(delta)) + } + + pub fn text_delta(&self, delta: String) -> Result<(), AppError> { + if delta.is_empty() { + return Ok(()); + } + + self.emit(self.event(AgentStreamItemKind::TextDelta).with_delta(delta)) + } + + pub fn tool_command(&self, command: String) -> Result<(), AppError> { + self.emit(self.event(AgentStreamItemKind::ToolCommand).with_command(command)) + } + + pub fn tool_result( + &self, + command: String, + response: TerminalCommandResponse, + ) -> Result<(), AppError> { + self.emit( + self.event(AgentStreamItemKind::ToolResult) + .with_command(command) + .with_response(response), + ) + } + + pub fn approval_required( + &self, + approval_id: String, + command: String, + title: String, + message: String, + ) -> Result<(), AppError> { + self.emit( + self.event(AgentStreamItemKind::ApprovalRequired) + .with_approval(approval_id, title, message) + .with_command(command), + ) + } + + pub fn stream_complete(&self) -> Result<(), AppError> { + self.emit(self.event(AgentStreamItemKind::StreamComplete)) + } + + pub fn error(&self, message: String) -> Result<(), AppError> { + self.emit(self.event(AgentStreamItemKind::Error).with_error(message)) + } + + pub fn session_id(&self) -> &str { + &self.session_id + } + + fn event(&self, kind: AgentStreamItemKind) -> AgentStreamItemEvent { + AgentStreamItemEvent { + workspace_id: self.workspace_id.clone(), + request_id: self.request_id.clone(), + session_id: self.session_id.clone(), + sequence: self.next_sequence.fetch_add(1, Ordering::Relaxed), + kind, + delta: None, + command: None, + response: None, + approval_id: None, + title: None, + message: None, + error_message: None, + } + } +} + +impl AgentStreamItemEvent { + fn with_delta(mut self, delta: String) -> Self { + self.delta = Some(delta); + self + } + + fn with_command(mut self, command: String) -> Self { + self.command = Some(command); + self + } + + fn with_response(mut self, response: TerminalCommandResponse) -> Self { + self.response = Some(response); + self + } + + fn with_approval(mut self, approval_id: String, title: String, message: String) -> Self { + self.approval_id = Some(approval_id); + self.title = Some(title); + self.message = Some(message); + self + } + + fn with_error(mut self, error_message: String) -> Self { + self.error_message = Some(error_message); + self + } +} + diff --git a/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs b/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs index 88064d5..73b60b5 100644 --- a/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs +++ b/MosaicIQ/src-tauri/src/agent/tools/terminal_command.rs @@ -7,13 +7,11 @@ use rig::completion::ToolDefinition; use rig::tool::Tool; use serde::Deserialize; use serde_json::json; -use tauri::{AppHandle, Emitter, Runtime}; +use tauri::Runtime; use tokio::time::timeout; +use crate::agent::stream_events::AgentStreamEmitter; use crate::agent::panel_context::{compact_panel_payload, panel_type}; -use crate::agent::{ - AgentToolApprovalRequiredEvent, AgentToolCommandEvent, AgentToolResultEvent, -}; use crate::error::AppError; use crate::state::PendingAgentToolApprovals; use crate::terminal::{ @@ -40,12 +38,10 @@ impl AgentCommandExecutor for TerminalCommandService { #[derive(Clone)] pub struct RunTerminalCommandTool { - pub app_handle: AppHandle, + pub stream_emitter: Arc>, pub command_executor: Arc, pub pending_approvals: Arc, pub workspace_id: String, - pub request_id: String, - pub session_id: String, } #[derive(Debug, Deserialize)] @@ -57,8 +53,6 @@ pub struct RunTerminalCommandArgs { pub enum RunTerminalCommandToolError { #[error("{0}")] App(#[from] AppError), - #[error("failed to emit terminal event: {0}")] - Emit(String), #[error("failed to serialize tool result: {0}")] Serialize(String), } @@ -127,63 +121,39 @@ impl Tool for RunTerminalCommandTool { }) .await; - self.emit_result(response.clone())?; + self.emit_result(&command, response.clone())?; serialize_response_tool_result(command, response) } } impl RunTerminalCommandTool { fn emit_command(&self, command: &str) -> Result<(), RunTerminalCommandToolError> { - self.app_handle - .emit( - "agent_tool_command", - AgentToolCommandEvent { - workspace_id: self.workspace_id.clone(), - request_id: self.request_id.clone(), - session_id: self.session_id.clone(), - command: command.to_string(), - }, - ) - .map_err(|error| RunTerminalCommandToolError::Emit(error.to_string())) + self.stream_emitter.tool_command(command.to_string())?; + Ok(()) } fn emit_result( &self, + command: &str, response: TerminalCommandResponse, ) -> Result<(), RunTerminalCommandToolError> { - self.app_handle - .emit( - "agent_tool_result", - AgentToolResultEvent { - workspace_id: self.workspace_id.clone(), - request_id: self.request_id.clone(), - session_id: self.session_id.clone(), - response, - }, - ) - .map_err(|error| RunTerminalCommandToolError::Emit(error.to_string())) + self.stream_emitter + .tool_result(command.to_string(), response)?; + Ok(()) } async fn await_approval(&self, command: &str) -> Result { let (approval_id, receiver) = self.pending_approvals.register()?; - self.app_handle - .emit( - "agent_tool_approval_required", - AgentToolApprovalRequiredEvent { - workspace_id: self.workspace_id.clone(), - request_id: self.request_id.clone(), - session_id: self.session_id.clone(), - approval_id: approval_id.clone(), - command: command.to_string(), - title: "Approve portfolio command".to_string(), - message: format!( - "The agent wants to run a portfolio-changing command:\n\n{}\n\nApprove this action to continue.", - command - ), - }, - ) - .map_err(|error| RunTerminalCommandToolError::Emit(error.to_string()))?; + self.stream_emitter.approval_required( + approval_id.clone(), + command.to_string(), + "Approve portfolio command".to_string(), + format!( + "The agent wants to run a portfolio-changing command:\n\n{}\n\nApprove this action to continue.", + command + ), + )?; let result = timeout(APPROVAL_TIMEOUT, receiver).await; match result { @@ -287,6 +257,7 @@ mod tests { command_name, is_allowed_agent_command, is_write_command, normalize_command, AgentCommandExecutor, RunTerminalCommandArgs, RunTerminalCommandTool, }; + use crate::agent::stream_events::AgentStreamEmitter; use crate::state::PendingAgentToolApprovals; use crate::terminal::{ Company, ExecuteTerminalCommandRequest, PanelPayload, TerminalCommandResponse, @@ -352,12 +323,15 @@ mod tests { })); let tool = RunTerminalCommandTool { - app_handle: app.handle().clone(), + stream_emitter: Arc::new(AgentStreamEmitter::new( + app.handle().clone(), + "workspace-1".to_string(), + "request-1".to_string(), + "session-1".to_string(), + )), command_executor: executor.clone(), pending_approvals: Arc::new(PendingAgentToolApprovals::new()), workspace_id: "workspace-1".to_string(), - request_id: "request-1".to_string(), - session_id: "session-1".to_string(), }; let result = tool @@ -382,12 +356,15 @@ mod tests { })); let tool = RunTerminalCommandTool { - app_handle: app.handle().clone(), + stream_emitter: Arc::new(AgentStreamEmitter::new( + app.handle().clone(), + "workspace-1".to_string(), + "request-1".to_string(), + "session-1".to_string(), + )), command_executor: executor.clone(), pending_approvals: approvals.clone(), workspace_id: "workspace-1".to_string(), - request_id: "request-1".to_string(), - session_id: "session-1".to_string(), }; let approvals_for_task = approvals.clone(); @@ -417,12 +394,15 @@ mod tests { })); let tool = RunTerminalCommandTool { - app_handle: app.handle().clone(), + stream_emitter: Arc::new(AgentStreamEmitter::new( + app.handle().clone(), + "workspace-1".to_string(), + "request-1".to_string(), + "session-1".to_string(), + )), command_executor: executor.clone(), pending_approvals: approvals.clone(), workspace_id: "workspace-1".to_string(), - request_id: "request-1".to_string(), - session_id: "session-1".to_string(), }; let approvals_for_task = approvals.clone(); diff --git a/MosaicIQ/src-tauri/src/agent/types.rs b/MosaicIQ/src-tauri/src/agent/types.rs index 41fad6c..dd6af7a 100644 --- a/MosaicIQ/src-tauri/src/agent/types.rs +++ b/MosaicIQ/src-tauri/src/agent/types.rs @@ -69,67 +69,40 @@ pub struct ChatStreamStart { pub session_id: String, } -/// Incremental delta emitted while the backend streams a reply. -#[derive(Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AgentDeltaEvent { - pub workspace_id: String, - pub request_id: String, - pub session_id: String, - pub delta: String, +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum AgentStreamItemKind { + ReasoningDelta, + TextDelta, + ToolCommand, + ToolResult, + ApprovalRequired, + StreamComplete, + Error, } -/// Final reply emitted when the backend completes a stream. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] -pub struct AgentResultEvent { +pub struct AgentStreamItemEvent { pub workspace_id: String, pub request_id: String, pub session_id: String, - pub reply: String, -} - -/// Event emitted when the agent decides to run a terminal command tool. -#[derive(Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AgentToolCommandEvent { - pub workspace_id: String, - pub request_id: String, - pub session_id: String, - pub command: String, -} - -/// Event emitted after an agent-triggered command completes. -#[derive(Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AgentToolResultEvent { - pub workspace_id: String, - pub request_id: String, - pub session_id: String, - pub response: TerminalCommandResponse, -} - -/// Event emitted when the agent requests approval for a mutating command. -#[derive(Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AgentToolApprovalRequiredEvent { - pub workspace_id: String, - pub request_id: String, - pub session_id: String, - pub approval_id: String, - pub command: String, - pub title: String, - pub message: String, -} - -/// Error event emitted when the backend cannot complete a stream. -#[derive(Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct AgentErrorEvent { - pub workspace_id: String, - pub request_id: String, - pub session_id: String, - pub message: String, + pub sequence: u64, + pub kind: AgentStreamItemKind, + #[serde(skip_serializing_if = "Option::is_none")] + pub delta: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub command: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub response: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub approval_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub title: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_message: Option, } /// Frontend request payload for approving or denying an agent-triggered write command. diff --git a/MosaicIQ/src-tauri/src/commands/terminal.rs b/MosaicIQ/src-tauri/src/commands/terminal.rs index 102fb97..705bc43 100644 --- a/MosaicIQ/src-tauri/src/commands/terminal.rs +++ b/MosaicIQ/src-tauri/src/commands/terminal.rs @@ -1,11 +1,10 @@ use std::time::Duration; -use futures::StreamExt; -use tauri::{Emitter, Manager}; +use tauri::Manager; use crate::agent::{ - AgentDeltaEvent, AgentErrorEvent, AgentResultEvent, AgentToolRuntimeContext, ChatGateway, - ChatPromptRequest, ChatStreamStart, ResolveAgentToolApprovalRequest, + AgentStreamEmitter, AgentToolRuntimeContext, ChatGateway, ChatPromptRequest, ChatStreamStart, + ResolveAgentToolApprovalRequest, }; use crate::state::AppState; use crate::terminal::{ @@ -59,89 +58,46 @@ pub async fn start_chat_stream( let app_handle = app.clone(); let command_executor = state.command_service.clone(); let pending_approvals = state.pending_agent_tool_approvals.clone(); + let stream_emitter = std::sync::Arc::new(AgentStreamEmitter::new( + app_handle.clone(), + prepared_turn.workspace_id.clone(), + request_id.clone(), + prepared_turn.session_id.clone(), + )); tauri::async_runtime::spawn(async move { tokio::time::sleep(Duration::from_millis(30)).await; // Resolve the upstream stream outside the mutex so long-running provider I/O // does not block other settings reads or chat requests. - let mut stream = match gateway + let reply = match gateway .stream_chat( prepared_turn.runtime.clone(), prepared_turn.prompt.clone(), prepared_turn.context_messages.clone(), prepared_turn.history.clone(), AgentToolRuntimeContext { - app_handle: app_handle.clone(), + stream_emitter: stream_emitter.clone(), command_executor, pending_approvals, workspace_id: prepared_turn.workspace_id.clone(), - request_id: request_id.clone(), - session_id: prepared_turn.session_id.clone(), }, ) .await { - Ok(stream) => stream, + Ok(reply) => reply, Err(error) => { - let _ = app_handle.emit( - "agent_error", - AgentErrorEvent { - workspace_id: prepared_turn.workspace_id, - request_id, - session_id: prepared_turn.session_id, - message: error.to_string(), - }, - ); + let _ = stream_emitter.error(error.to_string()); return; } }; - let mut reply = String::new(); - - while let Some(chunk) = stream.next().await { - match chunk { - Ok(delta) => { - reply.push_str(&delta); - let _ = app_handle.emit( - "agent_delta", - AgentDeltaEvent { - workspace_id: prepared_turn.workspace_id.clone(), - request_id: request_id.clone(), - session_id: prepared_turn.session_id.clone(), - delta, - }, - ); - } - Err(error) => { - let _ = app_handle.emit( - "agent_error", - AgentErrorEvent { - workspace_id: prepared_turn.workspace_id, - request_id, - session_id: prepared_turn.session_id, - message: error.to_string(), - }, - ); - return; - } - } - } - // Store the final assistant message after the stream completes so the next // conversational turn reuses the full transcript. if let Ok(mut agent) = app_handle.state::().agent.lock() { let _ = agent.record_assistant_reply(&prepared_turn.session_id, &reply); } - let _ = app_handle.emit( - "agent_result", - AgentResultEvent { - workspace_id: prepared_turn.workspace_id, - request_id, - session_id: prepared_turn.session_id, - reply, - }, - ); + let _ = stream_emitter.stream_complete(); }); Ok(start) diff --git a/MosaicIQ/src/App.tsx b/MosaicIQ/src/App.tsx index d3e0f5b..54777c3 100644 --- a/MosaicIQ/src/App.tsx +++ b/MosaicIQ/src/App.tsx @@ -22,6 +22,7 @@ import { terminalBridge } from './lib/terminalBridge'; import { AgentConfigStatus } from './types/agentSettings'; import { Portfolio } from './types/financial'; import { + AgentStreamItemEvent, ResolvedTerminalCommandResponse, PortfolioAction, PortfolioActionDraft, @@ -212,18 +213,141 @@ function App() { return; } - // Plain text keeps the current workspace conversation alive and streams into a placeholder response entry. + // Plain text keeps the current workspace conversation alive and appends streamed items in arrival order. const panelContext = extractChatPanelContext(currentWorkspace?.history ?? []); const commandEntry = createEntry({ type: 'command', content: resolvedCommand }); - const responseEntry = createEntry({ - type: 'response', - content: '', - renderMode: 'markdown', - }); - const toolCommandQueue: string[] = []; + let lastSequenceSeen = 0; + let activeTextEntryId: string | null = null; + let activeThinkingEntryId: string | null = null; tabs.appendWorkspaceEntry(workspaceId, commandEntry); - tabs.appendWorkspaceEntry(workspaceId, responseEntry); + + const appendStreamEntry = ( + type: 'response' | 'thinking', + renderMode: 'plain' | 'markdown', + ) => { + const entry = createEntry({ + type, + content: '', + renderMode, + }); + tabs.appendWorkspaceEntry(workspaceId, entry); + + if (type === 'response') { + activeTextEntryId = entry.id; + } else { + activeThinkingEntryId = entry.id; + } + + return entry.id; + }; + + const closeTextSegment = () => { + activeTextEntryId = null; + }; + + const closeThinkingSegment = () => { + activeThinkingEntryId = null; + }; + + const appendDeltaToEntry = (entryId: string, delta: string, renderMode: 'plain' | 'markdown') => { + tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ + ...entry, + content: typeof entry.content === 'string' ? `${entry.content}${delta}` : delta, + renderMode, + timestamp: new Date(), + })); + }; + + const processStreamItem = (event: Omit & { + response?: ResolvedTerminalCommandResponse; + }) => { + if (event.sequence <= lastSequenceSeen) { + return; + } + lastSequenceSeen = event.sequence; + + switch (event.kind) { + case 'reasoning_delta': { + if (!event.delta) { + return; + } + closeTextSegment(); + const entryId = + activeThinkingEntryId ?? appendStreamEntry('thinking', 'plain'); + appendDeltaToEntry(entryId, event.delta, 'plain'); + return; + } + case 'text_delta': { + if (!event.delta) { + return; + } + closeThinkingSegment(); + const entryId = + activeTextEntryId ?? appendStreamEntry('response', 'markdown'); + appendDeltaToEntry(entryId, event.delta, 'markdown'); + return; + } + case 'tool_command': { + if (!event.command) { + return; + } + closeThinkingSegment(); + closeTextSegment(); + tabs.appendWorkspaceEntry( + workspaceId, + createEntry({ type: 'command', content: event.command }), + ); + return; + } + case 'tool_result': { + if (!event.command || !event.response) { + return; + } + closeThinkingSegment(); + closeTextSegment(); + appendResolvedCommandResponse(workspaceId, event.command, event.response); + return; + } + case 'approval_required': { + if (!event.approvalId || !event.command || !event.title || !event.message) { + return; + } + setPendingAgentApproval({ + approvalId: event.approvalId, + command: event.command, + requestId: event.requestId, + workspaceId: event.workspaceId, + title: event.title, + message: event.message, + }); + return; + } + case 'stream_complete': { + tabs.setWorkspaceSession(workspaceId, event.sessionId); + closeThinkingSegment(); + closeTextSegment(); + setIsProcessing(false); + return; + } + case 'error': { + const entryId = activeTextEntryId ?? appendStreamEntry('response', 'plain'); + closeThinkingSegment(); + activeTextEntryId = entryId; + tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ + ...entry, + type: 'error', + content: event.errorMessage ?? 'Chat stream failed.', + renderMode: 'plain', + timestamp: new Date(), + })); + setIsProcessing(false); + return; + } + default: + return; + } + }; try { const start = await terminalBridge.startChatStream( @@ -235,63 +359,14 @@ function App() { panelContext, }, { - onDelta: (event) => { - // Update only the originating entry so workspace switches do not disrupt the active stream. - tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({ - ...entry, - content: typeof entry.content === 'string' ? `${entry.content}${event.delta}` : event.delta, - renderMode: 'markdown', - timestamp: new Date(), - })); - }, - onResult: (event) => { - tabs.setWorkspaceSession(workspaceId, event.sessionId); - tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({ - ...entry, - type: 'response', - content: event.reply, - renderMode: 'markdown', - timestamp: new Date(), - })); - setIsProcessing(false); - }, - onError: (event) => { - tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({ - ...entry, - type: 'error', - content: event.message, - renderMode: 'plain', - timestamp: new Date(), - })); - setIsProcessing(false); - }, - onToolCommand: (event) => { - toolCommandQueue.push(event.command); - tabs.appendWorkspaceEntry( - workspaceId, - createEntry({ type: 'command', content: event.command }), - ); - }, - onToolResult: (event) => { - const command = toolCommandQueue.shift(); - appendResolvedCommandResponse(workspaceId, command, event.response); - }, - onToolApprovalRequired: (event) => { - setPendingAgentApproval({ - approvalId: event.approvalId, - command: event.command, - requestId: event.requestId, - workspaceId: event.workspaceId, - title: event.title, - message: event.message, - }); - }, + onStreamItem: processStreamItem, } ); tabs.setWorkspaceSession(workspaceId, start.sessionId); } catch (error) { - tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({ + const entryId = activeTextEntryId ?? appendStreamEntry('response', 'plain'); + tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({ ...entry, type: 'error', content: error instanceof Error ? error.message : 'Chat stream failed.', diff --git a/MosaicIQ/src/components/Terminal/TerminalOutput.tsx b/MosaicIQ/src/components/Terminal/TerminalOutput.tsx index 17f23f3..4df489a 100644 --- a/MosaicIQ/src/components/Terminal/TerminalOutput.tsx +++ b/MosaicIQ/src/components/Terminal/TerminalOutput.tsx @@ -1,4 +1,4 @@ -import React, { useEffect } from 'react'; +import React, { useCallback, useEffect, useRef } from 'react'; import ReactMarkdown from 'react-markdown'; import remarkGfm from 'remark-gfm'; import { @@ -33,12 +33,66 @@ export const TerminalOutput: React.FC = ({ onRunCommand, onStartPortfolioAction, }) => { - // Auto-scroll to bottom when history changes - useEffect(() => { - if (outputRef.current) { - outputRef.current.scrollTop = outputRef.current.scrollHeight; + const contentRef = useRef(null); + const shouldStickToBottomRef = useRef(true); + + const isNearBottom = useCallback((element: HTMLDivElement) => { + const distanceFromBottom = + element.scrollHeight - element.scrollTop - element.clientHeight; + return distanceFromBottom < 48; + }, []); + + const scrollToBottom = useCallback(() => { + const element = outputRef.current; + if (!element || !shouldStickToBottomRef.current) { + return; } - }, [history, outputRef]); + + requestAnimationFrame(() => { + requestAnimationFrame(() => { + element.scrollTop = element.scrollHeight; + }); + }); + }, [outputRef]); + + useEffect(() => { + const element = outputRef.current; + if (!element) { + return; + } + + const handleScroll = () => { + shouldStickToBottomRef.current = isNearBottom(element); + }; + + handleScroll(); + element.addEventListener('scroll', handleScroll); + + return () => { + element.removeEventListener('scroll', handleScroll); + }; + }, [isNearBottom, outputRef]); + + useEffect(() => { + scrollToBottom(); + }, [history, scrollToBottom]); + + useEffect(() => { + const element = outputRef.current; + const content = contentRef.current; + if (!element || !content || typeof ResizeObserver === 'undefined') { + return; + } + + const observer = new ResizeObserver(() => { + if (shouldStickToBottomRef.current) { + element.scrollTop = element.scrollHeight; + } + }); + + observer.observe(content); + return () => observer.disconnect(); + }, [outputRef]); const renderPlainText = (content: string) => { const lines = content.split('\n'); @@ -145,6 +199,8 @@ export const TerminalOutput: React.FC = ({ return 'text-[#58a6ff]'; case 'system': return 'text-[#888888] italic'; + case 'thinking': + return 'text-[#8aa1bc]'; case 'error': return 'text-[#ff4757]'; case 'panel': @@ -163,6 +219,8 @@ export const TerminalOutput: React.FC = ({ return 'mb-2'; // Less space after commands case 'error': return 'mb-4'; // Moderate space for errors + case 'thinking': + return 'mb-3'; default: return 'mb-3'; // Default space } @@ -232,46 +290,61 @@ export const TerminalOutput: React.FC = ({ scrollbarColor: '#2a2a2a #111111' }} > - {history.map((entry) => ( -
- {/* Entry Header */} - {entry.type === 'command' && ( -
- {'>'} -
- {renderContent(entry)} +
+ {history.map((entry) => ( +
+ {/* Entry Header */} + {entry.type === 'command' && ( +
+ {'>'} +
+ {renderContent(entry)} +
-
- )} + )} - {entry.type !== 'command' && entry.type !== 'panel' && ( -
- {renderContent(entry)} -
- )} + {entry.type === 'thinking' && ( +
+
+ Thinking +
+
+ {renderContent(entry)} +
+
+ )} - {/* Render Panel */} - {entry.type === 'panel' && renderPanel(entry)} + {entry.type !== 'command' && + entry.type !== 'panel' && + entry.type !== 'thinking' && ( +
+ {renderContent(entry)} +
+ )} - {/* Timestamp - Selective display */} - {entry.timestamp && shouldShowTimestamp(entry) && ( -
- {entry.timestamp.toLocaleTimeString('en-US', { hour12: false })} -
- )} -
- ))} + {/* Render Panel */} + {entry.type === 'panel' && renderPanel(entry)} - {/* Empty State */} - {history.length === 0 && ( -
-
-
Terminal ready. Type a command or load /portfolio to open portfolio tools.
-
- )} + {/* Timestamp - Selective display */} + {entry.timestamp && shouldShowTimestamp(entry) && ( +
+ {entry.timestamp.toLocaleTimeString('en-US', { hour12: false })} +
+ )} +
+ ))} + + {/* Empty State */} + {history.length === 0 && ( +
+
+
Terminal ready. Type a command or load /portfolio to open portfolio tools.
+
+ )} +
); }; diff --git a/MosaicIQ/src/lib/terminalBridge.ts b/MosaicIQ/src/lib/terminalBridge.ts index 42b242d..b1cc672 100644 --- a/MosaicIQ/src/lib/terminalBridge.ts +++ b/MosaicIQ/src/lib/terminalBridge.ts @@ -2,12 +2,7 @@ import { invoke } from '@tauri-apps/api/core'; import { listen, type UnlistenFn } from '@tauri-apps/api/event'; import { NewsItem } from '../types/financial'; import { - AgentDeltaEvent, - AgentErrorEvent, - AgentResultEvent, - AgentToolApprovalRequiredEvent, - AgentToolCommandEvent, - AgentToolResultEvent, + AgentStreamItemEvent, ChatStreamStart, LookupCompanyRequest, ExecuteTerminalCommandRequest, @@ -16,19 +11,15 @@ import { ResolvedTerminalCommandResponse, StartChatStreamRequest, TerminalCommandResponse, - TransportAgentToolResultEvent, TransportPanelPayload, } from '../types/terminal'; import { Company } from '../types/financial'; interface StreamCallbacks { workspaceId: string; - onDelta: (event: AgentDeltaEvent) => void; - onResult: (event: AgentResultEvent) => void; - onError: (event: AgentErrorEvent) => void; - onToolCommand: (event: AgentToolCommandEvent) => void; - onToolResult: (event: AgentToolResultEvent) => void; - onToolApprovalRequired: (event: AgentToolApprovalRequiredEvent) => void; + onStreamItem: (event: Omit & { + response?: ResolvedTerminalCommandResponse; + }) => void; } const deserializePanelPayload = (payload: TransportPanelPayload): PanelPayload => { @@ -72,53 +63,23 @@ class TerminalBridge { } this.listenersReady = Promise.all([ - // Route incremental stream events back to the workspace that initiated the request. - listen('agent_delta', (event) => { + listen('agent_stream_item', (event) => { const callbacks = this.streamCallbacks.get(event.payload.requestId); if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) { return; } - callbacks.onDelta(event.payload); - }), - listen('agent_result', (event) => { - const callbacks = this.streamCallbacks.get(event.payload.requestId); - if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) { - return; - } - callbacks.onResult(event.payload); - this.streamCallbacks.delete(event.payload.requestId); - }), - listen('agent_error', (event) => { - const callbacks = this.streamCallbacks.get(event.payload.requestId); - if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) { - return; - } - callbacks.onError(event.payload); - this.streamCallbacks.delete(event.payload.requestId); - }), - listen('agent_tool_command', (event) => { - const callbacks = this.streamCallbacks.get(event.payload.requestId); - if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) { - return; - } - callbacks.onToolCommand(event.payload); - }), - listen('agent_tool_result', (event) => { - const callbacks = this.streamCallbacks.get(event.payload.requestId); - if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) { - return; - } - callbacks.onToolResult({ + callbacks.onStreamItem({ ...event.payload, - response: deserializeTerminalCommandResponse(event.payload.response), + response: event.payload.response + ? deserializeTerminalCommandResponse(event.payload.response) + : undefined, }); - }), - listen('agent_tool_approval_required', (event) => { - const callbacks = this.streamCallbacks.get(event.payload.requestId); - if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) { - return; + if ( + event.payload.kind === 'stream_complete' || + event.payload.kind === 'error' + ) { + this.streamCallbacks.delete(event.payload.requestId); } - callbacks.onToolApprovalRequired(event.payload); }), ]).then((unlistenFns) => { this.unlistenFns = unlistenFns; diff --git a/MosaicIQ/src/types/terminal.ts b/MosaicIQ/src/types/terminal.ts index 7ad1b9d..6f7a8ca 100644 --- a/MosaicIQ/src/types/terminal.ts +++ b/MosaicIQ/src/types/terminal.ts @@ -75,61 +75,33 @@ export interface ResolveAgentToolApprovalRequest { approved: boolean; } -export interface AgentDeltaEvent { - workspaceId: string; - requestId: string; - sessionId: string; - delta: string; -} +export type AgentStreamItemKind = + | 'reasoning_delta' + | 'text_delta' + | 'tool_command' + | 'tool_result' + | 'approval_required' + | 'stream_complete' + | 'error'; -export interface AgentResultEvent { +export interface AgentStreamItemEvent { workspaceId: string; requestId: string; sessionId: string; - reply: string; -} - -export interface AgentErrorEvent { - workspaceId: string; - requestId: string; - sessionId: string; - message: string; -} - -export interface AgentToolCommandEvent { - workspaceId: string; - requestId: string; - sessionId: string; - command: string; -} - -export interface TransportAgentToolResultEvent { - workspaceId: string; - requestId: string; - sessionId: string; - response: TerminalCommandResponse; -} - -export interface AgentToolResultEvent { - workspaceId: string; - requestId: string; - sessionId: string; - response: ResolvedTerminalCommandResponse; -} - -export interface AgentToolApprovalRequiredEvent { - workspaceId: string; - requestId: string; - sessionId: string; - approvalId: string; - command: string; - title: string; - message: string; + sequence: number; + kind: AgentStreamItemKind; + delta?: string; + command?: string; + response?: TerminalCommandResponse; + approvalId?: string; + title?: string; + message?: string; + errorMessage?: string; } export interface TerminalEntry { id: string; - type: 'command' | 'response' | 'system' | 'error' | 'panel'; + type: 'command' | 'response' | 'system' | 'error' | 'panel' | 'thinking'; content: string | PanelPayload; renderMode?: 'plain' | 'markdown'; timestamp?: Date;