Fix crash when creating Flux channel with all Plugins#600
Conversation
WalkthroughThe Prolog service engine has been refactored to replace asynchronous Tokio-based channels and runtime with Rust's standard synchronous mpsc channels, protected by Arc and Mutex for thread safety. The engine's request and response handling now uses blocking threads and synchronous message passing. Query execution logic was updated to limit the number of results, handle panics per iteration, and provide detailed error reporting. The engine pool now centralizes error handling for engine failures in a dedicated method, and adjusts to the new nested result structure from query execution. Test code received a minor import update and a slight change in error message text in assertions. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant EnginePool
participant PrologEngine
participant WorkerThread
Client->>EnginePool: run_query(query)
EnginePool->>PrologEngine: run_query(query)
PrologEngine->>WorkerThread: Send request via mpsc channel
WorkerThread-->>PrologEngine: Process query, collect results (limit 1M, catch panics)
PrologEngine-->>EnginePool: Return QueryResult (or Error)
EnginePool-->>Client: Return QueryResult or handle error (invalidate engine if needed)
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
rust-executor/src/prolog_service/engine_pool.rs (1)
98-116: 🛠️ Refactor suggestion
⚠️ Potential issueRelease the read-lock before awaiting or upgrading to a write-lock to avoid dead-locks
run_queryacquires anRwLockReadGuard(engines) and keeps it alive until the end of the function.
When an error occurs we delegate tohandle_engine_error, which tries to acquire a write lock on the sameRwLock.
Because the read-guard is still held, the task will hang forever – Tokio’sRwLockis not re-entrant and cannot upgrade from read to write.In addition, the read-guard is also held across the potentially long-running
awaitofengine.run_query, blocking every other writer in the system.A minimal fix is to drop the guard once we have extracted the engine slot index:
- let engines = self.engines.read().await; + let engines_guard = self.engines.read().await; ... - let (engine_idx, engine) = valid_engines[idx]; + let (engine_idx, engine) = valid_engines[idx]; + + // We no longer need the lock; release it before the await below + drop(engines_guard);That single
drop():
- Eliminates the dead-lock when
handle_engine_errortries to write.- Frees other tasks to write / invalidate engines while the query is executing.
Without this change the pool will hang on the first engine panic under load.
Also applies to: 124-139
🧹 Nitpick comments (3)
rust-executor/src/prolog_service/engine_pool.rs (1)
90-97: Minor: include engine index in the log and consider returning the original error typeNice centralisation of the failure path!
Two micro suggestions:
- Logging the
engine_idxhelps correlate with other metrics (memory, CPU, etc.).handle_engine_errorboxes the original error intoanyhow!, losing its concrete type. Forwarding theErrorunchanged lets callers down-cast if they want.Both are optional, but improve observability and debuggability.
rust-executor/src/prolog_service/engine.rs (2)
55-63: Hold theMutexonly while receiving – prevent accidental starvationThe current pattern locks the
Receiveronce, then holds the mutex for the entire lifetime of the service loop:let receiver = receiver.0.lock().unwrap(); // lock never released while let Ok(message) = receiver.recv() { ... }If additional threads are ever introduced that also need to
recv, they will starve forever.
A safer pattern is:-loop { - let message = { - let receiver_guard = receiver.0.lock().unwrap(); - receiver_guard.recv() - }; - match message { +loop { + let message = { + let guard = receiver.0.lock().unwrap(); + guard.recv() + }; + match message { ... } }This keeps the critical section minimal and future-proofs the code.
67-113: Potential OOM risk when collecting up to 1 000 000 answers
Vec::with_capacity(MAX_RESULTS)is not used, so the vector will realloc several times and may still attempt to store a million possibly largeTerms, exhausting memory.Consider streaming results back to the caller or switching to a chunked protocol (e.g. send every N answers) to bound peak memory usage. If a hard cap is still required, pre-allocate and early-return when the limit is hit to avoid extra
pushes.Not critical for correctness, but worth revisiting if queries may return large payloads.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rust-executor/src/prolog_service/engine.rs(5 hunks)rust-executor/src/prolog_service/engine_pool.rs(2 hunks)
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
rust-executor/src/prolog_service/engine.rs (3)
27-29: Consider documenting the SendableReceiver wrapper and its safety implications.The new
SendableReceiverwrapper is crucial for thread safety, but lacks documentation explaining its purpose and safety guarantees. The unsafeSyncimplementation needs justification.+/// Wrapper around mpsc::Receiver that makes it sharable between threads. +/// This is safe because we protect the receiver with a mutex. struct SendableReceiver<T>(Arc<Mutex<mpsc::Receiver<T>>>); +/// Safe because the inner Receiver is protected by a Mutex unsafe impl<T> Sync for SendableReceiver<T> {}
67-67: Consider making MAX_RESULTS configurable.The hard-coded limit of 1,000,000 results might not be appropriate for all use cases. Consider making this configurable through a parameter or environment variable.
- const MAX_RESULTS: usize = 1_000_000; // Adjust as needed + // Get limit from config or use default + const DEFAULT_MAX_RESULTS: usize = 1_000_000; + let max_results = std::env::var("PROLOG_MAX_RESULTS") + .ok() + .and_then(|s| s.parse::<usize>().ok()) + .unwrap_or(DEFAULT_MAX_RESULTS);
118-118: Result sending should handle potential channel errors.The code uses
let _ = response.send(...)to silently ignore send errors, which occurs when the receiver has been dropped. While this approach works, logging these occurrences could provide valuable diagnostic information.- let _ = response.send(PrologServiceResponse::QueryResult(result)); + if let Err(_) = response.send(PrologServiceResponse::QueryResult(result)) { + log::debug!("Failed to send query result - receiver dropped"); + }Also applies to: 156-156
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rust-executor/src/prolog_service/engine.rs(5 hunks)rust-executor/src/prolog_service/engine_pool.rs(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- rust-executor/src/prolog_service/engine_pool.rs
🔇 Additional comments (8)
rust-executor/src/prolog_service/engine.rs (8)
2-4: Well-structured import changes to accommodate the new implementation.The imports have been properly updated to reflect the change from Tokio's async channels to standard library's synchronous mpsc channels, along with the necessary thread synchronization primitives (Arc, Mutex).
Also applies to: 7-8
14-15: Properly updated request enum to use standard library channels.The PrologServiceRequest enum has been correctly modified to use synchronous mpsc senders for responses instead of oneshot channels, aligning with the PR's objective to remove Tokio runtime dependencies.
32-33: Updated PrologEngine fields for thread-safe synchronous communication.The engine's field types have been correctly updated to use standard mpsc channels, and the constructor properly wraps the receiver in the new SendableReceiver type with appropriate thread safety primitives.
Also applies to: 38-42
47-60: Good use of standard threads for Prolog execution instead of Tokio tasks.The spawn method now correctly uses standard threads instead of Tokio tasks for Prolog execution, which aligns with the PR objective to reduce contention on Tokio worker threads. The initialization flow with synchronous communication is well-structured.
61-63: Improved message loop with synchronous receiving.The change from async to synchronous message receiving simplifies the code and removes Tokio dependencies, which is in line with the PR objectives.
64-114: Robust implementation for handling panics during query execution.This is a critical improvement that addresses the core issue described in the PR objectives. The implementation:
- Sets a maximum result limit to prevent allocation failures
- Wraps each call to
iterator.next()incatch_unwindto handle panics at their source- Adds comprehensive error reporting with query context
- Provides warning logs for truncated results
These changes effectively resolve the crashes that occurred during query execution.
163-170: Good use of spawn_blocking for waiting on synchronous responses.The code correctly uses
tokio::task::spawn_blockingto wait for responses from synchronous channels without blocking the Tokio runtime, maintaining the async interface for callers while using synchronous communication internally.Also applies to: 180-187, 203-210
222-222: Explicit tokio import is good practice.Adding the explicit import of tokio in the test module is a good practice for code clarity and dependency management.
only when the engine panics do we need to invalidate it. a failing query is a different error level.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
rust-executor/src/prolog_service/engine.rs (1)
155-156: 🛠️ Refactor suggestionAdd panic handling for module loading
The
LoadModuleStringhandler doesn't have the same panic protection as the query execution. Theconsult_module_stringmethod could potentially panic as well.- machine.consult_module_string(module_name.as_str(), program); - let _ = response.send(PrologServiceResponse::LoadModuleResult(Ok(()))); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| { + machine.consult_module_string(module_name.as_str(), program.clone()); + Ok(()) + })); + + match result { + Ok(ok) => { + let _ = response.send(PrologServiceResponse::LoadModuleResult(ok)); + }, + Err(e) => { + let error_string = if let Some(string) = e.downcast_ref::<String>() { + format!("Scryer panicked with: {:?} - when loading module: {}", string, module_name) + } else if let Some(&str) = e.downcast_ref::<&str>() { + format!("Scryer panicked with: {:?} - when loading module: {}", str, module_name) + } else { + format!("Scryer panicked with: {:?} - when loading module: {}", e, module_name) + }; + log::error!("{}", error_string); + let _ = response.send(PrologServiceResponse::LoadModuleResult(Err(Error::msg(error_string)))); + } + }
🧹 Nitpick comments (1)
rust-executor/src/prolog_service/engine.rs (1)
67-67: Consider making the maximum result limit configurableThe maximum result limit is hard-coded to 1,000,000. For better flexibility across different system configurations and memory constraints, consider making this configurable either through a constant at the module level or as a parameter.
- const MAX_RESULTS: usize = 1_000_000; // Adjust as needed + const MAX_RESULTS: usize = 1_000_000; // Default limit, can be overriddenYou could define a configurable constant at the module level:
pub const DEFAULT_MAX_QUERY_RESULTS: usize = 1_000_000;And then reference it in the implementation.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rust-executor/src/prolog_service/engine.rs(4 hunks)rust-executor/src/prolog_service/engine_pool.rs(1 hunks)rust-executor/src/prolog_service/mod.rs(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- rust-executor/src/prolog_service/mod.rs
🔇 Additional comments (7)
rust-executor/src/prolog_service/engine_pool.rs (3)
90-101: Great addition of a centralized error handler!This new
handle_engine_errormethod properly centralizes error handling logic for Prolog engine failures, improving code organization and maintainability. It appropriately logs errors, invalidates failed engines, and returns a clear error message.
103-130: Clean refactoring of engine selection and query preprocessingThe updated
run_querymethod now properly scopes the engine selection and query preprocessing logic, providing better organization with explicit destructuring of results and engine index. This change aligns well with the PR's goal of improving robustness under high-load conditions.
131-150: Clear distinction between engine failures and query failuresThe updated match statement explicitly categorizes the different result states (engine panic, query failure, or success), which provides better error handling and makes it clear how each case is processed. This change directly addresses the PR objective of handling panics properly during query execution.
rust-executor/src/prolog_service/engine.rs (4)
2-9: Good replacement of Tokio channels with standard library synchronous channelsThe switch from Tokio's asynchronous channels to standard library synchronous channels is well-implemented. The custom
SendableReceiverwith proper thread-safety guarantees ensures correct cross-thread communication while reducing overhead and contention, directly achieving the PR objective.Also applies to: 27-34
46-61: Effective transition from Tokio runtime to dedicated threadThe spawn method now properly uses a dedicated thread instead of Tokio's runtime, which reduces overhead and contention on Tokio worker threads as mentioned in the PR objectives. The transition to blocking operations is well-implemented.
64-114: Robust panic handling implementation during iterationThe query execution has been greatly improved with:
- A manual loop instead of
collect()that catches panics on each iterator step- A maximum result limit of 1,000,000 to prevent allocation failures
- Proper termination of the loop on panic detection
- Detailed error reporting with query details
This directly addresses the PR's core objective of fixing crashes by catching panics during iteration.
180-193: Well-implemented asynchronous wrapper around synchronous communicationThe
run_queryandload_module_stringmethods now properly spawn blocking tasks to handle the synchronous communication, maintaining the async interface while benefiting from the simpler synchronous channel implementation. This is a good approach that preserves the API while improving the internal implementation.Also applies to: 200-216
Fix Flaky Crashes in PrologEngine by Handling Lazy Iterator Panics and Removing Tokio Runtime
This PR resolves flaky crashes in the PrologEngine under high-load conditions, where the application aborted due to a double panic during Scryer Prolog query execution. The issue was introduced by a Scryer Prolog update (commit 55f07af) that changed
run_queryto return a lazy iterator, deferring Prolog engine execution to the iterator’snext()method. Additionally, the PR improves performance by removing an unnecessary Tokio runtime from the Scryer Prolog thread.Root Cause
src/machine/mod.rs:1204) to occur duringcollect::<Result<Vec<LeafAnswer>, Term>>inPrologEngine::spawn.catch_unwind, but a secondary panic during cleanup (e.g., dropping the iterator or Vec) triggered panic_in_cleanup, causing an abort.Changes
collectwith a manual loop inPrologEngine::spawn, wrapping eachiterator.next()call instd::panic::catch_unwindto catch panics from the Prolog engine at the source.Impact
Maintains Functionality: Preserves the async interface for run_query and load_module_string while isolating Prolog execution.
Summary by CodeRabbit
Bug Fixes
Refactor
Chores