Order terminal agent transcript by streamed events
This commit is contained in:
@@ -1,7 +1,6 @@
|
|||||||
use std::pin::Pin;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::{future::BoxFuture, Stream, StreamExt};
|
use futures::{future::BoxFuture, StreamExt};
|
||||||
use rig::{
|
use rig::{
|
||||||
agent::MultiTurnStreamItem,
|
agent::MultiTurnStreamItem,
|
||||||
client::completion::CompletionClient,
|
client::completion::CompletionClient,
|
||||||
@@ -10,9 +9,8 @@ use rig::{
|
|||||||
providers::openai,
|
providers::openai,
|
||||||
streaming::{StreamedAssistantContent, StreamingPrompt},
|
streaming::{StreamedAssistantContent, StreamingPrompt},
|
||||||
};
|
};
|
||||||
use tauri::{AppHandle, Wry};
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
|
use crate::agent::stream_events::AgentStreamEmitter;
|
||||||
use crate::agent::tools::terminal_command::{AgentCommandExecutor, RunTerminalCommandTool};
|
use crate::agent::tools::terminal_command::{AgentCommandExecutor, RunTerminalCommandTool};
|
||||||
use crate::agent::AgentRuntimeConfig;
|
use crate::agent::AgentRuntimeConfig;
|
||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
@@ -21,17 +19,12 @@ use crate::state::PendingAgentToolApprovals;
|
|||||||
const SYSTEM_PROMPT: &str = "You are MosaicIQ's terminal chat assistant. Answer concisely in plain text. Use the available terminal command tool whenever current workspace data or live MosaicIQ terminal actions would improve the answer. Never claim to have run a command unless the tool actually ran it. If the request is unclear, ask a short clarifying question.";
|
const SYSTEM_PROMPT: &str = "You are MosaicIQ's terminal chat assistant. Answer concisely in plain text. Use the available terminal command tool whenever current workspace data or live MosaicIQ terminal actions would improve the answer. Never claim to have run a command unless the tool actually ran it. If the request is unclear, ask a short clarifying question.";
|
||||||
const MAX_TOOL_TURNS: usize = 4;
|
const MAX_TOOL_TURNS: usize = 4;
|
||||||
|
|
||||||
/// Streaming text output from the upstream chat provider.
|
|
||||||
pub type ChatGatewayStream = Pin<Box<dyn Stream<Item = Result<String, AppError>> + Send>>;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AgentToolRuntimeContext {
|
pub struct AgentToolRuntimeContext {
|
||||||
pub app_handle: AppHandle<Wry>,
|
pub stream_emitter: Arc<AgentStreamEmitter<tauri::Wry>>,
|
||||||
pub command_executor: Arc<dyn AgentCommandExecutor>,
|
pub command_executor: Arc<dyn AgentCommandExecutor>,
|
||||||
pub pending_approvals: Arc<PendingAgentToolApprovals>,
|
pub pending_approvals: Arc<PendingAgentToolApprovals>,
|
||||||
pub workspace_id: String,
|
pub workspace_id: String,
|
||||||
pub request_id: String,
|
|
||||||
pub session_id: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait used by the agent service so tests can inject a deterministic gateway.
|
/// Trait used by the agent service so tests can inject a deterministic gateway.
|
||||||
@@ -44,7 +37,7 @@ pub trait ChatGateway: Clone + Send + Sync + 'static {
|
|||||||
context_messages: Vec<Message>,
|
context_messages: Vec<Message>,
|
||||||
history: Vec<Message>,
|
history: Vec<Message>,
|
||||||
tool_runtime: AgentToolRuntimeContext,
|
tool_runtime: AgentToolRuntimeContext,
|
||||||
) -> BoxFuture<'static, Result<ChatGatewayStream, AppError>>;
|
) -> BoxFuture<'static, Result<String, AppError>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Production Rig-backed gateway using the OpenAI-compatible chat completions API.
|
/// Production Rig-backed gateway using the OpenAI-compatible chat completions API.
|
||||||
@@ -59,7 +52,7 @@ impl ChatGateway for RigChatGateway {
|
|||||||
context_messages: Vec<Message>,
|
context_messages: Vec<Message>,
|
||||||
history: Vec<Message>,
|
history: Vec<Message>,
|
||||||
tool_runtime: AgentToolRuntimeContext,
|
tool_runtime: AgentToolRuntimeContext,
|
||||||
) -> BoxFuture<'static, Result<ChatGatewayStream, AppError>> {
|
) -> BoxFuture<'static, Result<String, AppError>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let api_key = runtime.api_key.unwrap_or_default();
|
let api_key = runtime.api_key.unwrap_or_default();
|
||||||
let client = openai::CompletionsClient::builder()
|
let client = openai::CompletionsClient::builder()
|
||||||
@@ -70,12 +63,10 @@ impl ChatGateway for RigChatGateway {
|
|||||||
|
|
||||||
let history = compose_request_messages(context_messages, history);
|
let history = compose_request_messages(context_messages, history);
|
||||||
let tool = RunTerminalCommandTool {
|
let tool = RunTerminalCommandTool {
|
||||||
app_handle: tool_runtime.app_handle,
|
stream_emitter: tool_runtime.stream_emitter.clone(),
|
||||||
command_executor: tool_runtime.command_executor,
|
command_executor: tool_runtime.command_executor,
|
||||||
pending_approvals: tool_runtime.pending_approvals,
|
pending_approvals: tool_runtime.pending_approvals,
|
||||||
workspace_id: tool_runtime.workspace_id,
|
workspace_id: tool_runtime.workspace_id,
|
||||||
request_id: tool_runtime.request_id,
|
|
||||||
session_id: tool_runtime.session_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut rig_stream = client
|
let mut rig_stream = client
|
||||||
@@ -91,9 +82,9 @@ impl ChatGateway for RigChatGateway {
|
|||||||
.multi_turn(MAX_TOOL_TURNS)
|
.multi_turn(MAX_TOOL_TURNS)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::unbounded_channel::<Result<String, AppError>>();
|
let mut reply = String::new();
|
||||||
tauri::async_runtime::spawn(async move {
|
|
||||||
let mut saw_text = false;
|
let mut saw_text = false;
|
||||||
|
let mut saw_reasoning_delta = false;
|
||||||
|
|
||||||
while let Some(item) = rig_stream.next().await {
|
while let Some(item) = rig_stream.next().await {
|
||||||
match item {
|
match item {
|
||||||
@@ -101,31 +92,50 @@ impl ChatGateway for RigChatGateway {
|
|||||||
StreamedAssistantContent::Text(text),
|
StreamedAssistantContent::Text(text),
|
||||||
)) => {
|
)) => {
|
||||||
saw_text = true;
|
saw_text = true;
|
||||||
if sender.send(Ok(text.text)).is_err() {
|
reply.push_str(&text.text);
|
||||||
return;
|
tool_runtime.stream_emitter.text_delta(text.text)?;
|
||||||
}
|
}
|
||||||
|
Ok(MultiTurnStreamItem::StreamAssistantItem(
|
||||||
|
StreamedAssistantContent::Reasoning(reasoning),
|
||||||
|
)) => {
|
||||||
|
if saw_reasoning_delta {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let text = reasoning_text(&reasoning);
|
||||||
|
if text.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tool_runtime.stream_emitter.reasoning_delta(text)?;
|
||||||
|
}
|
||||||
|
Ok(MultiTurnStreamItem::StreamAssistantItem(
|
||||||
|
StreamedAssistantContent::ReasoningDelta { reasoning, .. },
|
||||||
|
)) => {
|
||||||
|
saw_reasoning_delta = true;
|
||||||
|
tool_runtime.stream_emitter.reasoning_delta(reasoning)?;
|
||||||
|
}
|
||||||
|
Ok(MultiTurnStreamItem::StreamAssistantItem(
|
||||||
|
StreamedAssistantContent::ToolCall { .. },
|
||||||
|
)) => {}
|
||||||
|
Ok(MultiTurnStreamItem::StreamAssistantItem(
|
||||||
|
StreamedAssistantContent::ToolCallDelta { .. },
|
||||||
|
)) => {}
|
||||||
|
Ok(MultiTurnStreamItem::StreamUserItem(_)) => {}
|
||||||
Ok(MultiTurnStreamItem::FinalResponse(final_response)) => {
|
Ok(MultiTurnStreamItem::FinalResponse(final_response)) => {
|
||||||
if !saw_text && !final_response.response().is_empty() {
|
if !saw_text && !final_response.response().is_empty() {
|
||||||
let _ = sender.send(Ok(final_response.response().to_string()));
|
reply.push_str(final_response.response());
|
||||||
|
tool_runtime
|
||||||
|
.stream_emitter
|
||||||
|
.text_delta(final_response.response().to_string())?;
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(error) => {
|
Err(error) => return Err(map_streaming_error(error)),
|
||||||
let _ = sender.send(Err(map_streaming_error(error)));
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let stream = futures::stream::unfold(receiver, |mut receiver| async move {
|
Ok(reply)
|
||||||
receiver.recv().await.map(|item| (item, receiver))
|
|
||||||
});
|
|
||||||
|
|
||||||
let stream: ChatGatewayStream = Box::pin(stream);
|
|
||||||
Ok(stream)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -137,6 +147,22 @@ fn compose_request_messages(
|
|||||||
context_messages.into_iter().chain(history).collect()
|
context_messages.into_iter().chain(history).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn reasoning_text(reasoning: &rig::message::Reasoning) -> String {
|
||||||
|
use rig::message::ReasoningContent;
|
||||||
|
|
||||||
|
reasoning
|
||||||
|
.content
|
||||||
|
.iter()
|
||||||
|
.filter_map(|block| match block {
|
||||||
|
ReasoningContent::Text { text, .. } => Some(text.as_str()),
|
||||||
|
ReasoningContent::Summary(text) => Some(text.as_str()),
|
||||||
|
ReasoningContent::Encrypted(_) | ReasoningContent::Redacted { .. } => None,
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n")
|
||||||
|
}
|
||||||
|
|
||||||
fn map_streaming_error(error: rig::agent::StreamingError) -> AppError {
|
fn map_streaming_error(error: rig::agent::StreamingError) -> AppError {
|
||||||
AppError::ProviderRequest(error.to_string())
|
AppError::ProviderRequest(error.to_string())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,17 +5,18 @@ mod panel_context;
|
|||||||
mod routing;
|
mod routing;
|
||||||
mod service;
|
mod service;
|
||||||
mod settings;
|
mod settings;
|
||||||
|
mod stream_events;
|
||||||
mod tools;
|
mod tools;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
pub use gateway::{AgentToolRuntimeContext, ChatGateway, RigChatGateway};
|
pub use gateway::{AgentToolRuntimeContext, ChatGateway, RigChatGateway};
|
||||||
pub use service::AgentService;
|
pub use service::AgentService;
|
||||||
pub(crate) use settings::AgentSettingsService;
|
pub(crate) use settings::AgentSettingsService;
|
||||||
|
pub use stream_events::AgentStreamEmitter;
|
||||||
pub use types::{
|
pub use types::{
|
||||||
default_task_defaults, AgentConfigStatus, AgentDeltaEvent, AgentErrorEvent, AgentResultEvent,
|
default_task_defaults, AgentConfigStatus, AgentRuntimeConfig, AgentStoredSettings,
|
||||||
AgentRuntimeConfig, AgentStoredSettings, AgentTaskRoute, AgentToolApprovalRequiredEvent,
|
AgentStreamItemEvent, AgentStreamItemKind, AgentTaskRoute, ChatPanelContext,
|
||||||
AgentToolCommandEvent, AgentToolResultEvent, ChatPanelContext, ChatPromptRequest,
|
ChatPromptRequest, ChatStreamStart, PreparedChatTurn, RemoteProviderSettings,
|
||||||
ChatStreamStart, PreparedChatTurn, RemoteProviderSettings,
|
|
||||||
ResolveAgentToolApprovalRequest, SaveAgentRuntimeConfigRequest, TaskProfile,
|
ResolveAgentToolApprovalRequest, SaveAgentRuntimeConfigRequest, TaskProfile,
|
||||||
UpdateRemoteApiKeyRequest, AGENT_SETTINGS_STORE_PATH, DEFAULT_REMOTE_BASE_URL,
|
UpdateRemoteApiKeyRequest, AGENT_SETTINGS_STORE_PATH, DEFAULT_REMOTE_BASE_URL,
|
||||||
DEFAULT_REMOTE_MODEL,
|
DEFAULT_REMOTE_MODEL,
|
||||||
|
|||||||
143
MosaicIQ/src-tauri/src/agent/stream_events.rs
Normal file
143
MosaicIQ/src-tauri/src/agent/stream_events.rs
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
||||||
|
use tauri::{AppHandle, Emitter, Runtime};
|
||||||
|
|
||||||
|
use crate::agent::{AgentStreamItemEvent, AgentStreamItemKind};
|
||||||
|
use crate::error::AppError;
|
||||||
|
use crate::terminal::TerminalCommandResponse;
|
||||||
|
|
||||||
|
pub struct AgentStreamEmitter<R: Runtime> {
|
||||||
|
app_handle: AppHandle<R>,
|
||||||
|
workspace_id: String,
|
||||||
|
request_id: String,
|
||||||
|
session_id: String,
|
||||||
|
next_sequence: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Runtime> AgentStreamEmitter<R> {
|
||||||
|
pub fn new(
|
||||||
|
app_handle: AppHandle<R>,
|
||||||
|
workspace_id: String,
|
||||||
|
request_id: String,
|
||||||
|
session_id: String,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
app_handle,
|
||||||
|
workspace_id,
|
||||||
|
request_id,
|
||||||
|
session_id,
|
||||||
|
next_sequence: AtomicU64::new(1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn emit(&self, event: AgentStreamItemEvent) -> Result<(), AppError> {
|
||||||
|
self.app_handle
|
||||||
|
.emit("agent_stream_item", event)
|
||||||
|
.map_err(|error| AppError::ProviderRequest(error.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reasoning_delta(&self, delta: String) -> Result<(), AppError> {
|
||||||
|
if delta.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.emit(self.event(AgentStreamItemKind::ReasoningDelta).with_delta(delta))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn text_delta(&self, delta: String) -> Result<(), AppError> {
|
||||||
|
if delta.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.emit(self.event(AgentStreamItemKind::TextDelta).with_delta(delta))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tool_command(&self, command: String) -> Result<(), AppError> {
|
||||||
|
self.emit(self.event(AgentStreamItemKind::ToolCommand).with_command(command))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tool_result(
|
||||||
|
&self,
|
||||||
|
command: String,
|
||||||
|
response: TerminalCommandResponse,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
self.emit(
|
||||||
|
self.event(AgentStreamItemKind::ToolResult)
|
||||||
|
.with_command(command)
|
||||||
|
.with_response(response),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn approval_required(
|
||||||
|
&self,
|
||||||
|
approval_id: String,
|
||||||
|
command: String,
|
||||||
|
title: String,
|
||||||
|
message: String,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
self.emit(
|
||||||
|
self.event(AgentStreamItemKind::ApprovalRequired)
|
||||||
|
.with_approval(approval_id, title, message)
|
||||||
|
.with_command(command),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stream_complete(&self) -> Result<(), AppError> {
|
||||||
|
self.emit(self.event(AgentStreamItemKind::StreamComplete))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn error(&self, message: String) -> Result<(), AppError> {
|
||||||
|
self.emit(self.event(AgentStreamItemKind::Error).with_error(message))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn session_id(&self) -> &str {
|
||||||
|
&self.session_id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn event(&self, kind: AgentStreamItemKind) -> AgentStreamItemEvent {
|
||||||
|
AgentStreamItemEvent {
|
||||||
|
workspace_id: self.workspace_id.clone(),
|
||||||
|
request_id: self.request_id.clone(),
|
||||||
|
session_id: self.session_id.clone(),
|
||||||
|
sequence: self.next_sequence.fetch_add(1, Ordering::Relaxed),
|
||||||
|
kind,
|
||||||
|
delta: None,
|
||||||
|
command: None,
|
||||||
|
response: None,
|
||||||
|
approval_id: None,
|
||||||
|
title: None,
|
||||||
|
message: None,
|
||||||
|
error_message: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AgentStreamItemEvent {
|
||||||
|
fn with_delta(mut self, delta: String) -> Self {
|
||||||
|
self.delta = Some(delta);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_command(mut self, command: String) -> Self {
|
||||||
|
self.command = Some(command);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_response(mut self, response: TerminalCommandResponse) -> Self {
|
||||||
|
self.response = Some(response);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_approval(mut self, approval_id: String, title: String, message: String) -> Self {
|
||||||
|
self.approval_id = Some(approval_id);
|
||||||
|
self.title = Some(title);
|
||||||
|
self.message = Some(message);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_error(mut self, error_message: String) -> Self {
|
||||||
|
self.error_message = Some(error_message);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@@ -7,13 +7,11 @@ use rig::completion::ToolDefinition;
|
|||||||
use rig::tool::Tool;
|
use rig::tool::Tool;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use tauri::{AppHandle, Emitter, Runtime};
|
use tauri::Runtime;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
|
||||||
|
use crate::agent::stream_events::AgentStreamEmitter;
|
||||||
use crate::agent::panel_context::{compact_panel_payload, panel_type};
|
use crate::agent::panel_context::{compact_panel_payload, panel_type};
|
||||||
use crate::agent::{
|
|
||||||
AgentToolApprovalRequiredEvent, AgentToolCommandEvent, AgentToolResultEvent,
|
|
||||||
};
|
|
||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
use crate::state::PendingAgentToolApprovals;
|
use crate::state::PendingAgentToolApprovals;
|
||||||
use crate::terminal::{
|
use crate::terminal::{
|
||||||
@@ -40,12 +38,10 @@ impl AgentCommandExecutor for TerminalCommandService {
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RunTerminalCommandTool<R: Runtime> {
|
pub struct RunTerminalCommandTool<R: Runtime> {
|
||||||
pub app_handle: AppHandle<R>,
|
pub stream_emitter: Arc<AgentStreamEmitter<R>>,
|
||||||
pub command_executor: Arc<dyn AgentCommandExecutor>,
|
pub command_executor: Arc<dyn AgentCommandExecutor>,
|
||||||
pub pending_approvals: Arc<PendingAgentToolApprovals>,
|
pub pending_approvals: Arc<PendingAgentToolApprovals>,
|
||||||
pub workspace_id: String,
|
pub workspace_id: String,
|
||||||
pub request_id: String,
|
|
||||||
pub session_id: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
@@ -57,8 +53,6 @@ pub struct RunTerminalCommandArgs {
|
|||||||
pub enum RunTerminalCommandToolError {
|
pub enum RunTerminalCommandToolError {
|
||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
App(#[from] AppError),
|
App(#[from] AppError),
|
||||||
#[error("failed to emit terminal event: {0}")]
|
|
||||||
Emit(String),
|
|
||||||
#[error("failed to serialize tool result: {0}")]
|
#[error("failed to serialize tool result: {0}")]
|
||||||
Serialize(String),
|
Serialize(String),
|
||||||
}
|
}
|
||||||
@@ -127,63 +121,39 @@ impl<R: Runtime> Tool for RunTerminalCommandTool<R> {
|
|||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
self.emit_result(response.clone())?;
|
self.emit_result(&command, response.clone())?;
|
||||||
serialize_response_tool_result(command, response)
|
serialize_response_tool_result(command, response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Runtime> RunTerminalCommandTool<R> {
|
impl<R: Runtime> RunTerminalCommandTool<R> {
|
||||||
fn emit_command(&self, command: &str) -> Result<(), RunTerminalCommandToolError> {
|
fn emit_command(&self, command: &str) -> Result<(), RunTerminalCommandToolError> {
|
||||||
self.app_handle
|
self.stream_emitter.tool_command(command.to_string())?;
|
||||||
.emit(
|
Ok(())
|
||||||
"agent_tool_command",
|
|
||||||
AgentToolCommandEvent {
|
|
||||||
workspace_id: self.workspace_id.clone(),
|
|
||||||
request_id: self.request_id.clone(),
|
|
||||||
session_id: self.session_id.clone(),
|
|
||||||
command: command.to_string(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.map_err(|error| RunTerminalCommandToolError::Emit(error.to_string()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn emit_result(
|
fn emit_result(
|
||||||
&self,
|
&self,
|
||||||
|
command: &str,
|
||||||
response: TerminalCommandResponse,
|
response: TerminalCommandResponse,
|
||||||
) -> Result<(), RunTerminalCommandToolError> {
|
) -> Result<(), RunTerminalCommandToolError> {
|
||||||
self.app_handle
|
self.stream_emitter
|
||||||
.emit(
|
.tool_result(command.to_string(), response)?;
|
||||||
"agent_tool_result",
|
Ok(())
|
||||||
AgentToolResultEvent {
|
|
||||||
workspace_id: self.workspace_id.clone(),
|
|
||||||
request_id: self.request_id.clone(),
|
|
||||||
session_id: self.session_id.clone(),
|
|
||||||
response,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.map_err(|error| RunTerminalCommandToolError::Emit(error.to_string()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn await_approval(&self, command: &str) -> Result<bool, RunTerminalCommandToolError> {
|
async fn await_approval(&self, command: &str) -> Result<bool, RunTerminalCommandToolError> {
|
||||||
let (approval_id, receiver) = self.pending_approvals.register()?;
|
let (approval_id, receiver) = self.pending_approvals.register()?;
|
||||||
|
|
||||||
self.app_handle
|
self.stream_emitter.approval_required(
|
||||||
.emit(
|
approval_id.clone(),
|
||||||
"agent_tool_approval_required",
|
command.to_string(),
|
||||||
AgentToolApprovalRequiredEvent {
|
"Approve portfolio command".to_string(),
|
||||||
workspace_id: self.workspace_id.clone(),
|
format!(
|
||||||
request_id: self.request_id.clone(),
|
|
||||||
session_id: self.session_id.clone(),
|
|
||||||
approval_id: approval_id.clone(),
|
|
||||||
command: command.to_string(),
|
|
||||||
title: "Approve portfolio command".to_string(),
|
|
||||||
message: format!(
|
|
||||||
"The agent wants to run a portfolio-changing command:\n\n{}\n\nApprove this action to continue.",
|
"The agent wants to run a portfolio-changing command:\n\n{}\n\nApprove this action to continue.",
|
||||||
command
|
command
|
||||||
),
|
),
|
||||||
},
|
)?;
|
||||||
)
|
|
||||||
.map_err(|error| RunTerminalCommandToolError::Emit(error.to_string()))?;
|
|
||||||
|
|
||||||
let result = timeout(APPROVAL_TIMEOUT, receiver).await;
|
let result = timeout(APPROVAL_TIMEOUT, receiver).await;
|
||||||
match result {
|
match result {
|
||||||
@@ -287,6 +257,7 @@ mod tests {
|
|||||||
command_name, is_allowed_agent_command, is_write_command, normalize_command,
|
command_name, is_allowed_agent_command, is_write_command, normalize_command,
|
||||||
AgentCommandExecutor, RunTerminalCommandArgs, RunTerminalCommandTool,
|
AgentCommandExecutor, RunTerminalCommandArgs, RunTerminalCommandTool,
|
||||||
};
|
};
|
||||||
|
use crate::agent::stream_events::AgentStreamEmitter;
|
||||||
use crate::state::PendingAgentToolApprovals;
|
use crate::state::PendingAgentToolApprovals;
|
||||||
use crate::terminal::{
|
use crate::terminal::{
|
||||||
Company, ExecuteTerminalCommandRequest, PanelPayload, TerminalCommandResponse,
|
Company, ExecuteTerminalCommandRequest, PanelPayload, TerminalCommandResponse,
|
||||||
@@ -352,12 +323,15 @@ mod tests {
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
let tool = RunTerminalCommandTool {
|
let tool = RunTerminalCommandTool {
|
||||||
app_handle: app.handle().clone(),
|
stream_emitter: Arc::new(AgentStreamEmitter::new(
|
||||||
|
app.handle().clone(),
|
||||||
|
"workspace-1".to_string(),
|
||||||
|
"request-1".to_string(),
|
||||||
|
"session-1".to_string(),
|
||||||
|
)),
|
||||||
command_executor: executor.clone(),
|
command_executor: executor.clone(),
|
||||||
pending_approvals: Arc::new(PendingAgentToolApprovals::new()),
|
pending_approvals: Arc::new(PendingAgentToolApprovals::new()),
|
||||||
workspace_id: "workspace-1".to_string(),
|
workspace_id: "workspace-1".to_string(),
|
||||||
request_id: "request-1".to_string(),
|
|
||||||
session_id: "session-1".to_string(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = tool
|
let result = tool
|
||||||
@@ -382,12 +356,15 @@ mod tests {
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
let tool = RunTerminalCommandTool {
|
let tool = RunTerminalCommandTool {
|
||||||
app_handle: app.handle().clone(),
|
stream_emitter: Arc::new(AgentStreamEmitter::new(
|
||||||
|
app.handle().clone(),
|
||||||
|
"workspace-1".to_string(),
|
||||||
|
"request-1".to_string(),
|
||||||
|
"session-1".to_string(),
|
||||||
|
)),
|
||||||
command_executor: executor.clone(),
|
command_executor: executor.clone(),
|
||||||
pending_approvals: approvals.clone(),
|
pending_approvals: approvals.clone(),
|
||||||
workspace_id: "workspace-1".to_string(),
|
workspace_id: "workspace-1".to_string(),
|
||||||
request_id: "request-1".to_string(),
|
|
||||||
session_id: "session-1".to_string(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let approvals_for_task = approvals.clone();
|
let approvals_for_task = approvals.clone();
|
||||||
@@ -417,12 +394,15 @@ mod tests {
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
let tool = RunTerminalCommandTool {
|
let tool = RunTerminalCommandTool {
|
||||||
app_handle: app.handle().clone(),
|
stream_emitter: Arc::new(AgentStreamEmitter::new(
|
||||||
|
app.handle().clone(),
|
||||||
|
"workspace-1".to_string(),
|
||||||
|
"request-1".to_string(),
|
||||||
|
"session-1".to_string(),
|
||||||
|
)),
|
||||||
command_executor: executor.clone(),
|
command_executor: executor.clone(),
|
||||||
pending_approvals: approvals.clone(),
|
pending_approvals: approvals.clone(),
|
||||||
workspace_id: "workspace-1".to_string(),
|
workspace_id: "workspace-1".to_string(),
|
||||||
request_id: "request-1".to_string(),
|
|
||||||
session_id: "session-1".to_string(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let approvals_for_task = approvals.clone();
|
let approvals_for_task = approvals.clone();
|
||||||
|
|||||||
@@ -69,67 +69,40 @@ pub struct ChatStreamStart {
|
|||||||
pub session_id: String,
|
pub session_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Incremental delta emitted while the backend streams a reply.
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[serde(rename_all = "snake_case")]
|
||||||
#[serde(rename_all = "camelCase")]
|
pub enum AgentStreamItemKind {
|
||||||
pub struct AgentDeltaEvent {
|
ReasoningDelta,
|
||||||
pub workspace_id: String,
|
TextDelta,
|
||||||
pub request_id: String,
|
ToolCommand,
|
||||||
pub session_id: String,
|
ToolResult,
|
||||||
pub delta: String,
|
ApprovalRequired,
|
||||||
|
StreamComplete,
|
||||||
|
Error,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Final reply emitted when the backend completes a stream.
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct AgentResultEvent {
|
pub struct AgentStreamItemEvent {
|
||||||
pub workspace_id: String,
|
pub workspace_id: String,
|
||||||
pub request_id: String,
|
pub request_id: String,
|
||||||
pub session_id: String,
|
pub session_id: String,
|
||||||
pub reply: String,
|
pub sequence: u64,
|
||||||
}
|
pub kind: AgentStreamItemKind,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
/// Event emitted when the agent decides to run a terminal command tool.
|
pub delta: Option<String>,
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
#[serde(rename_all = "camelCase")]
|
pub command: Option<String>,
|
||||||
pub struct AgentToolCommandEvent {
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub workspace_id: String,
|
pub response: Option<TerminalCommandResponse>,
|
||||||
pub request_id: String,
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub session_id: String,
|
pub approval_id: Option<String>,
|
||||||
pub command: String,
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
}
|
pub title: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
/// Event emitted after an agent-triggered command completes.
|
pub message: Option<String>,
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
#[serde(rename_all = "camelCase")]
|
pub error_message: Option<String>,
|
||||||
pub struct AgentToolResultEvent {
|
|
||||||
pub workspace_id: String,
|
|
||||||
pub request_id: String,
|
|
||||||
pub session_id: String,
|
|
||||||
pub response: TerminalCommandResponse,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Event emitted when the agent requests approval for a mutating command.
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct AgentToolApprovalRequiredEvent {
|
|
||||||
pub workspace_id: String,
|
|
||||||
pub request_id: String,
|
|
||||||
pub session_id: String,
|
|
||||||
pub approval_id: String,
|
|
||||||
pub command: String,
|
|
||||||
pub title: String,
|
|
||||||
pub message: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Error event emitted when the backend cannot complete a stream.
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct AgentErrorEvent {
|
|
||||||
pub workspace_id: String,
|
|
||||||
pub request_id: String,
|
|
||||||
pub session_id: String,
|
|
||||||
pub message: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Frontend request payload for approving or denying an agent-triggered write command.
|
/// Frontend request payload for approving or denying an agent-triggered write command.
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::StreamExt;
|
use tauri::Manager;
|
||||||
use tauri::{Emitter, Manager};
|
|
||||||
|
|
||||||
use crate::agent::{
|
use crate::agent::{
|
||||||
AgentDeltaEvent, AgentErrorEvent, AgentResultEvent, AgentToolRuntimeContext, ChatGateway,
|
AgentStreamEmitter, AgentToolRuntimeContext, ChatGateway, ChatPromptRequest, ChatStreamStart,
|
||||||
ChatPromptRequest, ChatStreamStart, ResolveAgentToolApprovalRequest,
|
ResolveAgentToolApprovalRequest,
|
||||||
};
|
};
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
use crate::terminal::{
|
use crate::terminal::{
|
||||||
@@ -59,89 +58,46 @@ pub async fn start_chat_stream(
|
|||||||
let app_handle = app.clone();
|
let app_handle = app.clone();
|
||||||
let command_executor = state.command_service.clone();
|
let command_executor = state.command_service.clone();
|
||||||
let pending_approvals = state.pending_agent_tool_approvals.clone();
|
let pending_approvals = state.pending_agent_tool_approvals.clone();
|
||||||
|
let stream_emitter = std::sync::Arc::new(AgentStreamEmitter::new(
|
||||||
|
app_handle.clone(),
|
||||||
|
prepared_turn.workspace_id.clone(),
|
||||||
|
request_id.clone(),
|
||||||
|
prepared_turn.session_id.clone(),
|
||||||
|
));
|
||||||
tauri::async_runtime::spawn(async move {
|
tauri::async_runtime::spawn(async move {
|
||||||
tokio::time::sleep(Duration::from_millis(30)).await;
|
tokio::time::sleep(Duration::from_millis(30)).await;
|
||||||
|
|
||||||
// Resolve the upstream stream outside the mutex so long-running provider I/O
|
// Resolve the upstream stream outside the mutex so long-running provider I/O
|
||||||
// does not block other settings reads or chat requests.
|
// does not block other settings reads or chat requests.
|
||||||
let mut stream = match gateway
|
let reply = match gateway
|
||||||
.stream_chat(
|
.stream_chat(
|
||||||
prepared_turn.runtime.clone(),
|
prepared_turn.runtime.clone(),
|
||||||
prepared_turn.prompt.clone(),
|
prepared_turn.prompt.clone(),
|
||||||
prepared_turn.context_messages.clone(),
|
prepared_turn.context_messages.clone(),
|
||||||
prepared_turn.history.clone(),
|
prepared_turn.history.clone(),
|
||||||
AgentToolRuntimeContext {
|
AgentToolRuntimeContext {
|
||||||
app_handle: app_handle.clone(),
|
stream_emitter: stream_emitter.clone(),
|
||||||
command_executor,
|
command_executor,
|
||||||
pending_approvals,
|
pending_approvals,
|
||||||
workspace_id: prepared_turn.workspace_id.clone(),
|
workspace_id: prepared_turn.workspace_id.clone(),
|
||||||
request_id: request_id.clone(),
|
|
||||||
session_id: prepared_turn.session_id.clone(),
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(stream) => stream,
|
Ok(reply) => reply,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
let _ = app_handle.emit(
|
let _ = stream_emitter.error(error.to_string());
|
||||||
"agent_error",
|
|
||||||
AgentErrorEvent {
|
|
||||||
workspace_id: prepared_turn.workspace_id,
|
|
||||||
request_id,
|
|
||||||
session_id: prepared_turn.session_id,
|
|
||||||
message: error.to_string(),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut reply = String::new();
|
|
||||||
|
|
||||||
while let Some(chunk) = stream.next().await {
|
|
||||||
match chunk {
|
|
||||||
Ok(delta) => {
|
|
||||||
reply.push_str(&delta);
|
|
||||||
let _ = app_handle.emit(
|
|
||||||
"agent_delta",
|
|
||||||
AgentDeltaEvent {
|
|
||||||
workspace_id: prepared_turn.workspace_id.clone(),
|
|
||||||
request_id: request_id.clone(),
|
|
||||||
session_id: prepared_turn.session_id.clone(),
|
|
||||||
delta,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
let _ = app_handle.emit(
|
|
||||||
"agent_error",
|
|
||||||
AgentErrorEvent {
|
|
||||||
workspace_id: prepared_turn.workspace_id,
|
|
||||||
request_id,
|
|
||||||
session_id: prepared_turn.session_id,
|
|
||||||
message: error.to_string(),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store the final assistant message after the stream completes so the next
|
// Store the final assistant message after the stream completes so the next
|
||||||
// conversational turn reuses the full transcript.
|
// conversational turn reuses the full transcript.
|
||||||
if let Ok(mut agent) = app_handle.state::<AppState>().agent.lock() {
|
if let Ok(mut agent) = app_handle.state::<AppState>().agent.lock() {
|
||||||
let _ = agent.record_assistant_reply(&prepared_turn.session_id, &reply);
|
let _ = agent.record_assistant_reply(&prepared_turn.session_id, &reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = app_handle.emit(
|
let _ = stream_emitter.stream_complete();
|
||||||
"agent_result",
|
|
||||||
AgentResultEvent {
|
|
||||||
workspace_id: prepared_turn.workspace_id,
|
|
||||||
request_id,
|
|
||||||
session_id: prepared_turn.session_id,
|
|
||||||
reply,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(start)
|
Ok(start)
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import { terminalBridge } from './lib/terminalBridge';
|
|||||||
import { AgentConfigStatus } from './types/agentSettings';
|
import { AgentConfigStatus } from './types/agentSettings';
|
||||||
import { Portfolio } from './types/financial';
|
import { Portfolio } from './types/financial';
|
||||||
import {
|
import {
|
||||||
|
AgentStreamItemEvent,
|
||||||
ResolvedTerminalCommandResponse,
|
ResolvedTerminalCommandResponse,
|
||||||
PortfolioAction,
|
PortfolioAction,
|
||||||
PortfolioActionDraft,
|
PortfolioActionDraft,
|
||||||
@@ -212,18 +213,141 @@ function App() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Plain text keeps the current workspace conversation alive and streams into a placeholder response entry.
|
// Plain text keeps the current workspace conversation alive and appends streamed items in arrival order.
|
||||||
const panelContext = extractChatPanelContext(currentWorkspace?.history ?? []);
|
const panelContext = extractChatPanelContext(currentWorkspace?.history ?? []);
|
||||||
const commandEntry = createEntry({ type: 'command', content: resolvedCommand });
|
const commandEntry = createEntry({ type: 'command', content: resolvedCommand });
|
||||||
const responseEntry = createEntry({
|
let lastSequenceSeen = 0;
|
||||||
type: 'response',
|
let activeTextEntryId: string | null = null;
|
||||||
content: '',
|
let activeThinkingEntryId: string | null = null;
|
||||||
renderMode: 'markdown',
|
|
||||||
});
|
|
||||||
const toolCommandQueue: string[] = [];
|
|
||||||
|
|
||||||
tabs.appendWorkspaceEntry(workspaceId, commandEntry);
|
tabs.appendWorkspaceEntry(workspaceId, commandEntry);
|
||||||
tabs.appendWorkspaceEntry(workspaceId, responseEntry);
|
|
||||||
|
const appendStreamEntry = (
|
||||||
|
type: 'response' | 'thinking',
|
||||||
|
renderMode: 'plain' | 'markdown',
|
||||||
|
) => {
|
||||||
|
const entry = createEntry({
|
||||||
|
type,
|
||||||
|
content: '',
|
||||||
|
renderMode,
|
||||||
|
});
|
||||||
|
tabs.appendWorkspaceEntry(workspaceId, entry);
|
||||||
|
|
||||||
|
if (type === 'response') {
|
||||||
|
activeTextEntryId = entry.id;
|
||||||
|
} else {
|
||||||
|
activeThinkingEntryId = entry.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
return entry.id;
|
||||||
|
};
|
||||||
|
|
||||||
|
const closeTextSegment = () => {
|
||||||
|
activeTextEntryId = null;
|
||||||
|
};
|
||||||
|
|
||||||
|
const closeThinkingSegment = () => {
|
||||||
|
activeThinkingEntryId = null;
|
||||||
|
};
|
||||||
|
|
||||||
|
const appendDeltaToEntry = (entryId: string, delta: string, renderMode: 'plain' | 'markdown') => {
|
||||||
|
tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({
|
||||||
|
...entry,
|
||||||
|
content: typeof entry.content === 'string' ? `${entry.content}${delta}` : delta,
|
||||||
|
renderMode,
|
||||||
|
timestamp: new Date(),
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
|
||||||
|
const processStreamItem = (event: Omit<AgentStreamItemEvent, 'response'> & {
|
||||||
|
response?: ResolvedTerminalCommandResponse;
|
||||||
|
}) => {
|
||||||
|
if (event.sequence <= lastSequenceSeen) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
lastSequenceSeen = event.sequence;
|
||||||
|
|
||||||
|
switch (event.kind) {
|
||||||
|
case 'reasoning_delta': {
|
||||||
|
if (!event.delta) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
closeTextSegment();
|
||||||
|
const entryId =
|
||||||
|
activeThinkingEntryId ?? appendStreamEntry('thinking', 'plain');
|
||||||
|
appendDeltaToEntry(entryId, event.delta, 'plain');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 'text_delta': {
|
||||||
|
if (!event.delta) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
closeThinkingSegment();
|
||||||
|
const entryId =
|
||||||
|
activeTextEntryId ?? appendStreamEntry('response', 'markdown');
|
||||||
|
appendDeltaToEntry(entryId, event.delta, 'markdown');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 'tool_command': {
|
||||||
|
if (!event.command) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
closeThinkingSegment();
|
||||||
|
closeTextSegment();
|
||||||
|
tabs.appendWorkspaceEntry(
|
||||||
|
workspaceId,
|
||||||
|
createEntry({ type: 'command', content: event.command }),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 'tool_result': {
|
||||||
|
if (!event.command || !event.response) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
closeThinkingSegment();
|
||||||
|
closeTextSegment();
|
||||||
|
appendResolvedCommandResponse(workspaceId, event.command, event.response);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 'approval_required': {
|
||||||
|
if (!event.approvalId || !event.command || !event.title || !event.message) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
setPendingAgentApproval({
|
||||||
|
approvalId: event.approvalId,
|
||||||
|
command: event.command,
|
||||||
|
requestId: event.requestId,
|
||||||
|
workspaceId: event.workspaceId,
|
||||||
|
title: event.title,
|
||||||
|
message: event.message,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 'stream_complete': {
|
||||||
|
tabs.setWorkspaceSession(workspaceId, event.sessionId);
|
||||||
|
closeThinkingSegment();
|
||||||
|
closeTextSegment();
|
||||||
|
setIsProcessing(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
case 'error': {
|
||||||
|
const entryId = activeTextEntryId ?? appendStreamEntry('response', 'plain');
|
||||||
|
closeThinkingSegment();
|
||||||
|
activeTextEntryId = entryId;
|
||||||
|
tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({
|
||||||
|
...entry,
|
||||||
|
type: 'error',
|
||||||
|
content: event.errorMessage ?? 'Chat stream failed.',
|
||||||
|
renderMode: 'plain',
|
||||||
|
timestamp: new Date(),
|
||||||
|
}));
|
||||||
|
setIsProcessing(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const start = await terminalBridge.startChatStream(
|
const start = await terminalBridge.startChatStream(
|
||||||
@@ -235,63 +359,14 @@ function App() {
|
|||||||
panelContext,
|
panelContext,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
onDelta: (event) => {
|
onStreamItem: processStreamItem,
|
||||||
// Update only the originating entry so workspace switches do not disrupt the active stream.
|
|
||||||
tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({
|
|
||||||
...entry,
|
|
||||||
content: typeof entry.content === 'string' ? `${entry.content}${event.delta}` : event.delta,
|
|
||||||
renderMode: 'markdown',
|
|
||||||
timestamp: new Date(),
|
|
||||||
}));
|
|
||||||
},
|
|
||||||
onResult: (event) => {
|
|
||||||
tabs.setWorkspaceSession(workspaceId, event.sessionId);
|
|
||||||
tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({
|
|
||||||
...entry,
|
|
||||||
type: 'response',
|
|
||||||
content: event.reply,
|
|
||||||
renderMode: 'markdown',
|
|
||||||
timestamp: new Date(),
|
|
||||||
}));
|
|
||||||
setIsProcessing(false);
|
|
||||||
},
|
|
||||||
onError: (event) => {
|
|
||||||
tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({
|
|
||||||
...entry,
|
|
||||||
type: 'error',
|
|
||||||
content: event.message,
|
|
||||||
renderMode: 'plain',
|
|
||||||
timestamp: new Date(),
|
|
||||||
}));
|
|
||||||
setIsProcessing(false);
|
|
||||||
},
|
|
||||||
onToolCommand: (event) => {
|
|
||||||
toolCommandQueue.push(event.command);
|
|
||||||
tabs.appendWorkspaceEntry(
|
|
||||||
workspaceId,
|
|
||||||
createEntry({ type: 'command', content: event.command }),
|
|
||||||
);
|
|
||||||
},
|
|
||||||
onToolResult: (event) => {
|
|
||||||
const command = toolCommandQueue.shift();
|
|
||||||
appendResolvedCommandResponse(workspaceId, command, event.response);
|
|
||||||
},
|
|
||||||
onToolApprovalRequired: (event) => {
|
|
||||||
setPendingAgentApproval({
|
|
||||||
approvalId: event.approvalId,
|
|
||||||
command: event.command,
|
|
||||||
requestId: event.requestId,
|
|
||||||
workspaceId: event.workspaceId,
|
|
||||||
title: event.title,
|
|
||||||
message: event.message,
|
|
||||||
});
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
tabs.setWorkspaceSession(workspaceId, start.sessionId);
|
tabs.setWorkspaceSession(workspaceId, start.sessionId);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
tabs.updateWorkspaceEntry(workspaceId, responseEntry.id, (entry) => ({
|
const entryId = activeTextEntryId ?? appendStreamEntry('response', 'plain');
|
||||||
|
tabs.updateWorkspaceEntry(workspaceId, entryId, (entry) => ({
|
||||||
...entry,
|
...entry,
|
||||||
type: 'error',
|
type: 'error',
|
||||||
content: error instanceof Error ? error.message : 'Chat stream failed.',
|
content: error instanceof Error ? error.message : 'Chat stream failed.',
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import React, { useEffect } from 'react';
|
import React, { useCallback, useEffect, useRef } from 'react';
|
||||||
import ReactMarkdown from 'react-markdown';
|
import ReactMarkdown from 'react-markdown';
|
||||||
import remarkGfm from 'remark-gfm';
|
import remarkGfm from 'remark-gfm';
|
||||||
import {
|
import {
|
||||||
@@ -33,12 +33,66 @@ export const TerminalOutput: React.FC<TerminalOutputProps> = ({
|
|||||||
onRunCommand,
|
onRunCommand,
|
||||||
onStartPortfolioAction,
|
onStartPortfolioAction,
|
||||||
}) => {
|
}) => {
|
||||||
// Auto-scroll to bottom when history changes
|
const contentRef = useRef<HTMLDivElement | null>(null);
|
||||||
useEffect(() => {
|
const shouldStickToBottomRef = useRef(true);
|
||||||
if (outputRef.current) {
|
|
||||||
outputRef.current.scrollTop = outputRef.current.scrollHeight;
|
const isNearBottom = useCallback((element: HTMLDivElement) => {
|
||||||
|
const distanceFromBottom =
|
||||||
|
element.scrollHeight - element.scrollTop - element.clientHeight;
|
||||||
|
return distanceFromBottom < 48;
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const scrollToBottom = useCallback(() => {
|
||||||
|
const element = outputRef.current;
|
||||||
|
if (!element || !shouldStickToBottomRef.current) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}, [history, outputRef]);
|
|
||||||
|
requestAnimationFrame(() => {
|
||||||
|
requestAnimationFrame(() => {
|
||||||
|
element.scrollTop = element.scrollHeight;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}, [outputRef]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
const element = outputRef.current;
|
||||||
|
if (!element) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const handleScroll = () => {
|
||||||
|
shouldStickToBottomRef.current = isNearBottom(element);
|
||||||
|
};
|
||||||
|
|
||||||
|
handleScroll();
|
||||||
|
element.addEventListener('scroll', handleScroll);
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
element.removeEventListener('scroll', handleScroll);
|
||||||
|
};
|
||||||
|
}, [isNearBottom, outputRef]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
scrollToBottom();
|
||||||
|
}, [history, scrollToBottom]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
const element = outputRef.current;
|
||||||
|
const content = contentRef.current;
|
||||||
|
if (!element || !content || typeof ResizeObserver === 'undefined') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const observer = new ResizeObserver(() => {
|
||||||
|
if (shouldStickToBottomRef.current) {
|
||||||
|
element.scrollTop = element.scrollHeight;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
observer.observe(content);
|
||||||
|
return () => observer.disconnect();
|
||||||
|
}, [outputRef]);
|
||||||
|
|
||||||
const renderPlainText = (content: string) => {
|
const renderPlainText = (content: string) => {
|
||||||
const lines = content.split('\n');
|
const lines = content.split('\n');
|
||||||
@@ -145,6 +199,8 @@ export const TerminalOutput: React.FC<TerminalOutputProps> = ({
|
|||||||
return 'text-[#58a6ff]';
|
return 'text-[#58a6ff]';
|
||||||
case 'system':
|
case 'system':
|
||||||
return 'text-[#888888] italic';
|
return 'text-[#888888] italic';
|
||||||
|
case 'thinking':
|
||||||
|
return 'text-[#8aa1bc]';
|
||||||
case 'error':
|
case 'error':
|
||||||
return 'text-[#ff4757]';
|
return 'text-[#ff4757]';
|
||||||
case 'panel':
|
case 'panel':
|
||||||
@@ -163,6 +219,8 @@ export const TerminalOutput: React.FC<TerminalOutputProps> = ({
|
|||||||
return 'mb-2'; // Less space after commands
|
return 'mb-2'; // Less space after commands
|
||||||
case 'error':
|
case 'error':
|
||||||
return 'mb-4'; // Moderate space for errors
|
return 'mb-4'; // Moderate space for errors
|
||||||
|
case 'thinking':
|
||||||
|
return 'mb-3';
|
||||||
default:
|
default:
|
||||||
return 'mb-3'; // Default space
|
return 'mb-3'; // Default space
|
||||||
}
|
}
|
||||||
@@ -232,6 +290,7 @@ export const TerminalOutput: React.FC<TerminalOutputProps> = ({
|
|||||||
scrollbarColor: '#2a2a2a #111111'
|
scrollbarColor: '#2a2a2a #111111'
|
||||||
}}
|
}}
|
||||||
>
|
>
|
||||||
|
<div ref={contentRef}>
|
||||||
{history.map((entry) => (
|
{history.map((entry) => (
|
||||||
<div
|
<div
|
||||||
key={entry.id}
|
key={entry.id}
|
||||||
@@ -247,7 +306,20 @@ export const TerminalOutput: React.FC<TerminalOutputProps> = ({
|
|||||||
</div>
|
</div>
|
||||||
)}
|
)}
|
||||||
|
|
||||||
{entry.type !== 'command' && entry.type !== 'panel' && (
|
{entry.type === 'thinking' && (
|
||||||
|
<div className="rounded-lg border border-[#223044] bg-[#0f1724]/70 px-3 py-2">
|
||||||
|
<div className="mb-1 text-[10px] font-mono uppercase tracking-[0.24em] text-[#5d7491]">
|
||||||
|
Thinking
|
||||||
|
</div>
|
||||||
|
<div className={getEntryColor(entry.type)}>
|
||||||
|
{renderContent(entry)}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
|
{entry.type !== 'command' &&
|
||||||
|
entry.type !== 'panel' &&
|
||||||
|
entry.type !== 'thinking' && (
|
||||||
<div className={getEntryColor(entry.type)}>
|
<div className={getEntryColor(entry.type)}>
|
||||||
{renderContent(entry)}
|
{renderContent(entry)}
|
||||||
</div>
|
</div>
|
||||||
@@ -273,5 +345,6 @@ export const TerminalOutput: React.FC<TerminalOutputProps> = ({
|
|||||||
</div>
|
</div>
|
||||||
)}
|
)}
|
||||||
</div>
|
</div>
|
||||||
|
</div>
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -2,12 +2,7 @@ import { invoke } from '@tauri-apps/api/core';
|
|||||||
import { listen, type UnlistenFn } from '@tauri-apps/api/event';
|
import { listen, type UnlistenFn } from '@tauri-apps/api/event';
|
||||||
import { NewsItem } from '../types/financial';
|
import { NewsItem } from '../types/financial';
|
||||||
import {
|
import {
|
||||||
AgentDeltaEvent,
|
AgentStreamItemEvent,
|
||||||
AgentErrorEvent,
|
|
||||||
AgentResultEvent,
|
|
||||||
AgentToolApprovalRequiredEvent,
|
|
||||||
AgentToolCommandEvent,
|
|
||||||
AgentToolResultEvent,
|
|
||||||
ChatStreamStart,
|
ChatStreamStart,
|
||||||
LookupCompanyRequest,
|
LookupCompanyRequest,
|
||||||
ExecuteTerminalCommandRequest,
|
ExecuteTerminalCommandRequest,
|
||||||
@@ -16,19 +11,15 @@ import {
|
|||||||
ResolvedTerminalCommandResponse,
|
ResolvedTerminalCommandResponse,
|
||||||
StartChatStreamRequest,
|
StartChatStreamRequest,
|
||||||
TerminalCommandResponse,
|
TerminalCommandResponse,
|
||||||
TransportAgentToolResultEvent,
|
|
||||||
TransportPanelPayload,
|
TransportPanelPayload,
|
||||||
} from '../types/terminal';
|
} from '../types/terminal';
|
||||||
import { Company } from '../types/financial';
|
import { Company } from '../types/financial';
|
||||||
|
|
||||||
interface StreamCallbacks {
|
interface StreamCallbacks {
|
||||||
workspaceId: string;
|
workspaceId: string;
|
||||||
onDelta: (event: AgentDeltaEvent) => void;
|
onStreamItem: (event: Omit<AgentStreamItemEvent, 'response'> & {
|
||||||
onResult: (event: AgentResultEvent) => void;
|
response?: ResolvedTerminalCommandResponse;
|
||||||
onError: (event: AgentErrorEvent) => void;
|
}) => void;
|
||||||
onToolCommand: (event: AgentToolCommandEvent) => void;
|
|
||||||
onToolResult: (event: AgentToolResultEvent) => void;
|
|
||||||
onToolApprovalRequired: (event: AgentToolApprovalRequiredEvent) => void;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const deserializePanelPayload = (payload: TransportPanelPayload): PanelPayload => {
|
const deserializePanelPayload = (payload: TransportPanelPayload): PanelPayload => {
|
||||||
@@ -72,53 +63,23 @@ class TerminalBridge {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.listenersReady = Promise.all([
|
this.listenersReady = Promise.all([
|
||||||
// Route incremental stream events back to the workspace that initiated the request.
|
listen<AgentStreamItemEvent>('agent_stream_item', (event) => {
|
||||||
listen<AgentDeltaEvent>('agent_delta', (event) => {
|
|
||||||
const callbacks = this.streamCallbacks.get(event.payload.requestId);
|
const callbacks = this.streamCallbacks.get(event.payload.requestId);
|
||||||
if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) {
|
if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
callbacks.onDelta(event.payload);
|
callbacks.onStreamItem({
|
||||||
}),
|
|
||||||
listen<AgentResultEvent>('agent_result', (event) => {
|
|
||||||
const callbacks = this.streamCallbacks.get(event.payload.requestId);
|
|
||||||
if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
callbacks.onResult(event.payload);
|
|
||||||
this.streamCallbacks.delete(event.payload.requestId);
|
|
||||||
}),
|
|
||||||
listen<AgentErrorEvent>('agent_error', (event) => {
|
|
||||||
const callbacks = this.streamCallbacks.get(event.payload.requestId);
|
|
||||||
if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
callbacks.onError(event.payload);
|
|
||||||
this.streamCallbacks.delete(event.payload.requestId);
|
|
||||||
}),
|
|
||||||
listen<AgentToolCommandEvent>('agent_tool_command', (event) => {
|
|
||||||
const callbacks = this.streamCallbacks.get(event.payload.requestId);
|
|
||||||
if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
callbacks.onToolCommand(event.payload);
|
|
||||||
}),
|
|
||||||
listen<TransportAgentToolResultEvent>('agent_tool_result', (event) => {
|
|
||||||
const callbacks = this.streamCallbacks.get(event.payload.requestId);
|
|
||||||
if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
callbacks.onToolResult({
|
|
||||||
...event.payload,
|
...event.payload,
|
||||||
response: deserializeTerminalCommandResponse(event.payload.response),
|
response: event.payload.response
|
||||||
|
? deserializeTerminalCommandResponse(event.payload.response)
|
||||||
|
: undefined,
|
||||||
});
|
});
|
||||||
}),
|
if (
|
||||||
listen<AgentToolApprovalRequiredEvent>('agent_tool_approval_required', (event) => {
|
event.payload.kind === 'stream_complete' ||
|
||||||
const callbacks = this.streamCallbacks.get(event.payload.requestId);
|
event.payload.kind === 'error'
|
||||||
if (!callbacks || callbacks.workspaceId !== event.payload.workspaceId) {
|
) {
|
||||||
return;
|
this.streamCallbacks.delete(event.payload.requestId);
|
||||||
}
|
}
|
||||||
callbacks.onToolApprovalRequired(event.payload);
|
|
||||||
}),
|
}),
|
||||||
]).then((unlistenFns) => {
|
]).then((unlistenFns) => {
|
||||||
this.unlistenFns = unlistenFns;
|
this.unlistenFns = unlistenFns;
|
||||||
|
|||||||
@@ -75,61 +75,33 @@ export interface ResolveAgentToolApprovalRequest {
|
|||||||
approved: boolean;
|
approved: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface AgentDeltaEvent {
|
export type AgentStreamItemKind =
|
||||||
workspaceId: string;
|
| 'reasoning_delta'
|
||||||
requestId: string;
|
| 'text_delta'
|
||||||
sessionId: string;
|
| 'tool_command'
|
||||||
delta: string;
|
| 'tool_result'
|
||||||
}
|
| 'approval_required'
|
||||||
|
| 'stream_complete'
|
||||||
|
| 'error';
|
||||||
|
|
||||||
export interface AgentResultEvent {
|
export interface AgentStreamItemEvent {
|
||||||
workspaceId: string;
|
workspaceId: string;
|
||||||
requestId: string;
|
requestId: string;
|
||||||
sessionId: string;
|
sessionId: string;
|
||||||
reply: string;
|
sequence: number;
|
||||||
}
|
kind: AgentStreamItemKind;
|
||||||
|
delta?: string;
|
||||||
export interface AgentErrorEvent {
|
command?: string;
|
||||||
workspaceId: string;
|
response?: TerminalCommandResponse;
|
||||||
requestId: string;
|
approvalId?: string;
|
||||||
sessionId: string;
|
title?: string;
|
||||||
message: string;
|
message?: string;
|
||||||
}
|
errorMessage?: string;
|
||||||
|
|
||||||
export interface AgentToolCommandEvent {
|
|
||||||
workspaceId: string;
|
|
||||||
requestId: string;
|
|
||||||
sessionId: string;
|
|
||||||
command: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface TransportAgentToolResultEvent {
|
|
||||||
workspaceId: string;
|
|
||||||
requestId: string;
|
|
||||||
sessionId: string;
|
|
||||||
response: TerminalCommandResponse;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface AgentToolResultEvent {
|
|
||||||
workspaceId: string;
|
|
||||||
requestId: string;
|
|
||||||
sessionId: string;
|
|
||||||
response: ResolvedTerminalCommandResponse;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface AgentToolApprovalRequiredEvent {
|
|
||||||
workspaceId: string;
|
|
||||||
requestId: string;
|
|
||||||
sessionId: string;
|
|
||||||
approvalId: string;
|
|
||||||
command: string;
|
|
||||||
title: string;
|
|
||||||
message: string;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface TerminalEntry {
|
export interface TerminalEntry {
|
||||||
id: string;
|
id: string;
|
||||||
type: 'command' | 'response' | 'system' | 'error' | 'panel';
|
type: 'command' | 'response' | 'system' | 'error' | 'panel' | 'thinking';
|
||||||
content: string | PanelPayload;
|
content: string | PanelPayload;
|
||||||
renderMode?: 'plain' | 'markdown';
|
renderMode?: 'plain' | 'markdown';
|
||||||
timestamp?: Date;
|
timestamp?: Date;
|
||||||
|
|||||||
Reference in New Issue
Block a user