Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/agent/agent_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,18 @@ impl Agent {
}

async fn handle_message(&self, message: &IncomingMessage) -> Result<Option<String>, Error> {
// Log at info level only for tracking without exposing PII (user_id can be a phone number)
tracing::info!(message_id = %message.id, "Processing message");

// Log sensitive details at debug level for troubleshooting
tracing::debug!(
message_id = %message.id,
user_id = %message.user_id,
channel = %message.channel,
thread_id = ?message.thread_id,
"Message details"
);

// Set message tool context for this turn (current channel and target)
// For Signal, use signal_target from metadata (group:ID or phone number),
// otherwise fall back to user_id
Expand Down Expand Up @@ -784,10 +796,19 @@ impl Agent {

// Hydrate thread from DB if it's a historical thread not in memory
if let Some(ref external_thread_id) = message.thread_id {
tracing::debug!(
message_id = %message.id,
thread_id = %external_thread_id,
"Hydrating thread from DB"
);
self.maybe_hydrate_thread(message, external_thread_id).await;
}

// Resolve session and thread
tracing::debug!(
message_id = %message.id,
"Resolving session and thread"
);
let (session, thread_id) = self
.session_manager
.resolve_thread(
Expand All @@ -796,6 +817,11 @@ impl Agent {
message.thread_id.as_deref(),
)
.await;
tracing::info!(
message_id = %message.id,
thread_id = %thread_id,
"Resolved session and thread"
);

// Auth mode interception: if the thread is awaiting a token, route
// the message directly to the credential store. Nothing touches
Expand Down
40 changes: 40 additions & 0 deletions src/agent/thread_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ impl Agent {
thread_id: Uuid,
content: &str,
) -> Result<SubmissionResult, Error> {
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
content_len = content.len(),
"Processing user input"
);

// First check thread state without holding lock during I/O
let thread_state = {
let sess = session.lock().await;
Expand All @@ -123,19 +130,41 @@ impl Agent {
thread.state
};

tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
thread_state = ?thread_state,
"Checked thread state"
);

// Check thread state
match thread_state {
ThreadState::Processing => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread is processing, rejecting new input"
);
return Ok(SubmissionResult::error(
"Turn in progress. Use /interrupt to cancel.",
));
}
ThreadState::AwaitingApproval => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread awaiting approval, rejecting new input"
);
return Ok(SubmissionResult::error(
"Waiting for approval. Use /interrupt to cancel.",
));
}
ThreadState::Completed => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread completed, rejecting new input"
);
return Ok(SubmissionResult::error(
"Thread completed. Use /thread new.",
));
Expand Down Expand Up @@ -269,9 +298,20 @@ impl Agent {
};

// Persist user message to DB immediately so it survives crashes
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
"Persisting user message to DB"
);
self.persist_user_message(thread_id, &message.user_id, effective_content)
.await;

tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
"User message persisted, starting agentic loop"
);

// Send thinking status
let _ = self
.channels
Expand Down
134 changes: 79 additions & 55 deletions src/channels/web/handlers/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub async fn chat_send_handler(
}

let msg_id = msg.id;
let thread_id = msg.thread_id.clone();

let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
Expand All @@ -49,6 +50,13 @@ pub async fn chat_send_handler(
)
})?;

tracing::debug!(
message_id = %msg_id,
thread_id = ?thread_id,
content_len = req.content.len(),
"Message queued to agent loop"
);

Ok((
StatusCode::ACCEPTED,
Json(SendMessageResponse {
Expand Down Expand Up @@ -263,7 +271,6 @@ pub async fn chat_history_handler(
))?;

let session = session_manager.get_or_create_session(&state.user_id).await;
let sess = session.lock().await;

let limit = query.limit.unwrap_or(50);
let before_cursor = query
Expand All @@ -281,11 +288,12 @@ pub async fn chat_history_handler(
})
.transpose()?;

// Find the thread
// Find the thread (lock only briefly to get active_thread if needed)
let thread_id = if let Some(ref tid) = query.thread_id {
Uuid::parse_str(tid)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid thread_id".to_string()))?
} else {
let sess = session.lock().await;
sess.active_thread
.ok_or((StatusCode::NOT_FOUND, "No active thread".to_string()))?
};
Expand All @@ -298,8 +306,11 @@ pub async fn chat_history_handler(
.conversation_belongs_to_user(thread_id, &state.user_id)
.await
.unwrap_or(false);
if !owned && !sess.threads.contains_key(&thread_id) {
return Err((StatusCode::NOT_FOUND, "Thread not found".to_string()));
if !owned {
let sess = session.lock().await;
if !sess.threads.contains_key(&thread_id) {
return Err((StatusCode::NOT_FOUND, "Thread not found".to_string()));
}
}
}

Expand All @@ -324,56 +335,60 @@ pub async fn chat_history_handler(
}

// Try in-memory first (freshest data for active threads)
if let Some(thread) = sess.threads.get(&thread_id)
&& (!thread.turns.is_empty() || thread.pending_approval.is_some())
// Lock only when checking in-memory state
{
let turns: Vec<TurnInfo> = thread
.turns
.iter()
.map(|t| TurnInfo {
turn_number: t.turn_number,
user_input: t.user_input.clone(),
response: t.response.clone(),
state: format!("{:?}", t.state),
started_at: t.started_at.to_rfc3339(),
completed_at: t.completed_at.map(|dt| dt.to_rfc3339()),
tool_calls: t
.tool_calls
.iter()
.map(|tc| ToolCallInfo {
name: tc.name.clone(),
has_result: tc.result.is_some(),
has_error: tc.error.is_some(),
result_preview: tc.result.as_ref().map(|r| {
let s = match r {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
truncate_preview(&s, 500)
}),
error: tc.error.clone(),
})
.collect(),
})
.collect();

let pending_approval = thread
.pending_approval
.as_ref()
.map(|pa| PendingApprovalInfo {
request_id: pa.request_id.to_string(),
tool_name: pa.tool_name.clone(),
description: pa.description.clone(),
parameters: serde_json::to_string_pretty(&pa.parameters).unwrap_or_default(),
});
let sess = session.lock().await;
if let Some(thread) = sess.threads.get(&thread_id)
&& (!thread.turns.is_empty() || thread.pending_approval.is_some())
{
let turns: Vec<TurnInfo> = thread
.turns
.iter()
.map(|t| TurnInfo {
turn_number: t.turn_number,
user_input: t.user_input.clone(),
response: t.response.clone(),
state: format!("{:?}", t.state),
started_at: t.started_at.to_rfc3339(),
completed_at: t.completed_at.map(|dt| dt.to_rfc3339()),
tool_calls: t
.tool_calls
.iter()
.map(|tc| ToolCallInfo {
name: tc.name.clone(),
has_result: tc.result.is_some(),
has_error: tc.error.is_some(),
result_preview: tc.result.as_ref().map(|r| {
let s = match r {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
truncate_preview(&s, 500)
}),
error: tc.error.clone(),
})
.collect(),
})
.collect();

let pending_approval = thread
.pending_approval
.as_ref()
.map(|pa| PendingApprovalInfo {
request_id: pa.request_id.to_string(),
tool_name: pa.tool_name.clone(),
description: pa.description.clone(),
parameters: serde_json::to_string_pretty(&pa.parameters).unwrap_or_default(),
});

return Ok(Json(HistoryResponse {
thread_id,
turns,
has_more: false,
oldest_timestamp: None,
pending_approval,
}));
return Ok(Json(HistoryResponse {
thread_id,
turns,
has_more: false,
oldest_timestamp: None,
pending_approval,
}));
}
}

// Fall back to DB for historical threads not in memory (paginated)
Expand Down Expand Up @@ -415,7 +430,6 @@ pub async fn chat_threads_handler(
))?;

let session = session_manager.get_or_create_session(&state.user_id).await;
let sess = session.lock().await;

// Try DB first for persistent thread list
if let Some(ref store) = state.store {
Expand Down Expand Up @@ -465,15 +479,22 @@ pub async fn chat_threads_handler(
});
}

// Read active thread while holding minimal lock (just before return)
let active_thread = {
let sess = session.lock().await;
sess.active_thread
};

return Ok(Json(ThreadListResponse {
assistant_thread,
threads,
active_thread: sess.active_thread,
active_thread,
}));
}
}

// Fallback: in-memory only (no assistant thread without DB)
let sess = session.lock().await;
let mut sorted_threads: Vec<_> = sess.threads.values().collect();
sorted_threads.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
let threads: Vec<ThreadInfo> = sorted_threads
Expand All @@ -490,10 +511,13 @@ pub async fn chat_threads_handler(
})
.collect();

let active_thread = sess.active_thread;
drop(sess); // Explicit drop to release lock

Ok(Json(ThreadListResponse {
assistant_thread: None,
threads,
active_thread: sess.active_thread,
active_thread,
}))
}

Expand Down
Loading
Loading