Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 27 additions & 31 deletions rust-executor/src/perspectives/perspective_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,6 @@ impl PerspectiveInstance {
}

pub async fn diff_from_link_language(&self, diff: PerspectiveDiff) -> Result<(), AnyError> {
let handle = self.persisted.lock().await.clone();

// Deduplicate by (author, timestamp, source, predicate, target)
// Use structured keys to avoid delimiter collision issues
let mut seen_add: std::collections::HashSet<String> = std::collections::HashSet::new();
Expand Down Expand Up @@ -1036,7 +1034,6 @@ impl PerspectiveInstance {
status: LinkStatus,
batch_id: Option<String>,
) -> Result<DecoratedLinkExpression, AnyError> {
let handle = self.persisted.lock().await.clone();
if let Some(batch_id) = batch_id {
let mut batches = self.batch_store.write().await;
let diff = batches
Expand Down Expand Up @@ -1134,7 +1131,6 @@ impl PerspectiveInstance {
status: LinkStatus,
context: &AgentContext,
) -> Result<DecoratedPerspectiveDiff, AnyError> {
let handle = self.persisted.lock().await.clone();
let additions = mutations
.additions
.into_iter()
Expand Down Expand Up @@ -1197,7 +1193,7 @@ impl PerspectiveInstance {
)
.await?;

let (link, link_status) = match decorated_link_option {
let (_link, link_status) = match decorated_link_option {
Some(decorated) => {
let status = decorated.status.clone().unwrap_or(LinkStatus::Local);
(LinkExpression::from(decorated), status)
Expand Down Expand Up @@ -1345,9 +1341,6 @@ impl PerspectiveInstance {
// Split into links and statuses
let (links, statuses): (Vec<_>, Vec<_>) = existing_links.into_iter().unzip();

// Create diff from links that exist
let diff = PerspectiveDiff::from_removals(links.clone());

// Create decorated versions
let decorated_links: Vec<DecoratedLinkExpression> = links
.into_iter()
Expand Down Expand Up @@ -1619,30 +1612,27 @@ impl PerspectiveInstance {
}

async fn ensure_prolog_engine_pool(&self) -> Result<(), AnyError> {
// Take write lock and check if we need to initialize
let _guard = self.prolog_update_mutex.write().await;

// Get service reference before taking any locks
// Get service reference and perspective data BEFORE acquiring write lock
let service = get_prolog_service().await;
let persisted = self.persisted.lock().await;
let uuid = persisted.uuid.clone();
let owner_did = persisted.get_primary_owner();
drop(persisted); // Release the lock early
let (uuid, owner_did, neighbourhood_author) = {
let persisted = self.persisted.lock().await;
let uuid = persisted.uuid.clone();
let owner_did = persisted.get_primary_owner();
let neighbourhood_author = persisted.neighbourhood.as_ref().map(|n| n.author.clone());
(uuid, owner_did, neighbourhood_author)
};

// Check if initialization is needed WITHOUT holding any locks
if !service.has_perspective_pool(uuid.clone()).await
|| !service
.has_perspective_pool(notification_pool_name(&uuid))
.await
{
// Initialize with links for optimized filtering
// Get all links BEFORE acquiring write lock to avoid deadlock
let all_links = self.get_links(&LinkQuery::default()).await?;
let neighbourhood_author = self
.persisted
.lock()
.await
.neighbourhood
.as_ref()
.map(|n| n.author.clone());

// NOW take write lock after all async operations that might need locks are done
let _guard = self.prolog_update_mutex.write().await;

// Check if pool exists under the write lock
if !service.has_perspective_pool(uuid.clone()).await {
Expand Down Expand Up @@ -4061,6 +4051,17 @@ impl PerspectiveInstance {
}

async fn subscribed_queries_loop(&self) {
// Prolog subscriptions only make sense in Simple and Pooled modes
// In SdnaOnly mode, link queries don't work, only SDNA queries
// In Disabled mode, prolog is disabled entirely
if PROLOG_MODE == PrologMode::SdnaOnly || PROLOG_MODE == PrologMode::Disabled {
log::debug!(
"Prolog subscription loop disabled in {:?} mode",
PROLOG_MODE
);
return;
}

let mut log_counter = 0;
const LOG_INTERVAL: u32 = 300; // Log every ~60 seconds (300 * 200ms)

Expand All @@ -4077,9 +4078,10 @@ impl PerspectiveInstance {
log_counter += 1;
if log_counter >= LOG_INTERVAL {
log_counter = 0;
// Get perspective_uuid FIRST before acquiring subscribed_queries lock to avoid deadlock
let perspective_uuid = self.persisted.lock().await.uuid.clone();
let queries = self.subscribed_queries.lock().await;
if !queries.is_empty() {
let perspective_uuid = self.persisted.lock().await.uuid.clone();
log::info!(
"📊 Prolog subscriptions [{}]: {} active",
perspective_uuid,
Expand Down Expand Up @@ -4253,12 +4255,6 @@ impl PerspectiveInstance {
// shared_diff.additions.len(), shared_diff.removals.len(),
// local_diff.additions.len(), local_diff.removals.len());

// Get UUID without holding lock during DB operations
let uuid = {
let handle = self.persisted.lock().await;
handle.uuid.clone()
};

// Apply shared changes
if !shared_diff.additions.is_empty() || !shared_diff.removals.is_empty() {
//let db_start = std::time::Instant::now();
Expand Down
54 changes: 22 additions & 32 deletions rust-executor/src/prolog_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ impl PrologService {
let processed_facts =
PoolUtils::preprocess_program_lines(facts_to_load, &embedding_cache).await;

// LOCK SCOPE: Acquire write lock ONLY to get mutable engine references
// CRITICAL FIX: Hold write lock during entire update to prevent deadlock
// Previously, we released the lock and moved engines out temporarily with dummy placeholders.
// This caused deadlocks: other threads could acquire read lock, get dummy engines,
// try to query them, and block forever waiting for non-existent background threads.
// Now we keep the lock held during the update. Yes, this blocks other queries temporarily,
// but updates are rare (only when dirty) and blocking is better than deadlock.
let mut engines = self.simple_engines.write().await;

// Use Entry API to avoid race condition between check and insert
Expand All @@ -280,54 +285,36 @@ impl PrologService {
}
}

// Get mutable reference and move engines out temporarily
// Get mutable reference to engines
let simple_engine = engines.get_mut(&engine_key).unwrap();

// Move engines out of the struct temporarily
let query_engine_to_update =
std::mem::replace(&mut simple_engine.query_engine, PrologEngine::new());
let subscription_engine_to_update =
std::mem::replace(&mut simple_engine.subscription_engine, PrologEngine::new());

// Release write lock before expensive load operations
drop(engines);

// EXPENSIVE OPERATIONS OUTSIDE THE LOCK:
// Load facts into both engines - wrap in error handling to restore on failure
// Load facts directly into the engines while holding the lock
// This prevents other threads from querying dummy/uninitialized engines
let load_result = async {
query_engine_to_update
simple_engine
.query_engine
.load_module_string("facts", &processed_facts)
.await?;
subscription_engine_to_update
simple_engine
.subscription_engine
.load_module_string("facts", &processed_facts)
.await?;
Ok::<_, Error>(())
}
.await;

// Handle load failure by restoring original engines
// Handle load failure
if let Err(e) = load_result {
let mut engines = self.simple_engines.write().await;
let simple_engine = engines.get_mut(&engine_key).unwrap();

// Restore the original engines
simple_engine.query_engine = query_engine_to_update;
simple_engine.subscription_engine = subscription_engine_to_update;

// Mark dirty so update will be retried
// Mark dirty so update will be retried on next query
simple_engine.dirty = true;

// Release lock before returning error
drop(engines);

return Err(e);
}

// LOCK SCOPE: Reacquire write lock to update final state
let mut engines = self.simple_engines.write().await;
let simple_engine = engines.get_mut(&engine_key).unwrap();

// Move engines back
simple_engine.query_engine = query_engine_to_update;
simple_engine.subscription_engine = subscription_engine_to_update;

// Update succeeded - mark as clean and update metadata
simple_engine.dirty = false;

// MEMORY OPTIMIZATION: In SdnaOnly mode, don't store full links
Expand All @@ -342,6 +329,9 @@ impl PrologService {
"Prolog engines {} updated successfully (query + subscription)",
perspective_id
);

// Explicitly drop lock to make it clear when it's released
drop(engines);
}

Ok(())
Expand Down