Skip to content

Fix crash when creating Flux channel with all Plugins#600

Merged
lucksus merged 11 commits intodevfrom
feature/fix-crashes-with-all-plugins
Apr 25, 2025
Merged

Fix crash when creating Flux channel with all Plugins#600
lucksus merged 11 commits intodevfrom
feature/fix-crashes-with-all-plugins

Conversation

@lucksus
Copy link
Member

@lucksus lucksus commented Apr 24, 2025

Fix Flaky Crashes in PrologEngine by Handling Lazy Iterator Panics and Removing Tokio Runtime

This PR resolves flaky crashes in the PrologEngine under high-load conditions, where the application aborted due to a double panic during Scryer Prolog query execution. The issue was introduced by a Scryer Prolog update (commit 55f07af) that changed run_query to return a lazy iterator, deferring Prolog engine execution to the iterator’s next() method. Additionally, the PR improves performance by removing an unnecessary Tokio runtime from the Scryer Prolog thread.

Root Cause

  • The Scryer Prolog update made run_query return a lazy iterator, causing panics (e.g., at src/machine/mod.rs:1204) to occur during collect::<Result<Vec<LeafAnswer>, Term>> in PrologEngine::spawn.
  • An initial panic (e.g., from allocation failure or Prolog engine error) was caught by catch_unwind, but a secondary panic during cleanup (e.g., dropping the iterator or Vec) triggered panic_in_cleanup, causing an abort.
  • The original code used a Tokio runtime in the Scryer thread for async channels, adding overhead and complicating panic handling in the current_thread scheduler.

Changes

  • Replaced collect with a manual loop in PrologEngine::spawn, wrapping each iterator.next() call in std::panic::catch_unwind to catch panics from the Prolog engine at the source.
  • Added a MAX_RESULTS limit (1,000,000) to cap Vec growth, preventing allocation failures.
  • Hardened cleanup logic by safely handling mpsc::Sender errors and explicitly resetting the Machine on panic.
  • Removed the Tokio runtime from the Scryer thread, replacing async channels (tokio::sync::mpsc) with synchronous ones (std::sync::mpsc) and moving async receiver logic to the caller via std::thread::spawn.
  • Replaced tokio::task::spawn_blocking with std::thread::spawn for receiving responses, reducing Tokio worker thread contention.
  • Added logging for query result sizes and warnings for truncated results to aid debugging.
  • Fixed actual handling of panics in the engine pool to invalidate broken engine (we didn't see that error before fixing the panic catching

Impact

  • Fixes Crashes: The loop-based approach catches panics in next(), avoids collect allocation failures, and prevents secondary cleanup panics, ensuring robust query execution.
  • Performance Improvement: Eliminating the Tokio runtime reduces overhead (e.g., runtime initialization, async scheduling), resulting in faster Scryer Prolog query execution.
  • Better Diagnostics: Logging result counts helps identify large queries that could strain resources.

Maintains Functionality: Preserves the async interface for run_query and load_module_string while isolating Prolog execution.

Summary by CodeRabbit

  • Bug Fixes

    • Improved error handling during query execution, ensuring that engine failures are logged and problematic engines are invalidated to prevent further issues.
    • Enhanced panic detection and error reporting for query iteration, providing clearer diagnostic messages.
    • Added a limit on query result size to prevent excessive resource use, with warnings logged when the limit is reached.
  • Refactor

    • Switched internal communication from asynchronous to synchronous channels, resulting in more predictable and robust request processing.
    • Consolidated error handling logic for engine errors into a dedicated method for consistency and maintainability.
    • Updated query execution to process results iteratively with per-step panic handling for increased stability.
  • Chores

    • Updated internal test imports without affecting user-facing functionality.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Apr 24, 2025

Walkthrough

The Prolog service engine has been refactored to replace asynchronous Tokio-based channels and runtime with Rust's standard synchronous mpsc channels, protected by Arc and Mutex for thread safety. The engine's request and response handling now uses blocking threads and synchronous message passing. Query execution logic was updated to limit the number of results, handle panics per iteration, and provide detailed error reporting. The engine pool now centralizes error handling for engine failures in a dedicated method, and adjusts to the new nested result structure from query execution. Test code received a minor import update and a slight change in error message text in assertions.

Changes

File(s) Change Summary
rust-executor/src/prolog_service/engine.rs Replaced Tokio async channels and runtime with synchronous mpsc channels using Arc and Mutex. Updated internal request/response handling to use blocking threads. Added a SendableReceiver struct with unsafe Sync. Modified query execution to cap results, catch panics per iteration, and provide detailed errors. Updated public API to use mpsc senders instead of oneshot channels.
rust-executor/src/prolog_service/engine_pool.rs Added a private async method for centralized error handling (handle_engine_error). Updated run_query to use this method for both outer and inner errors, and to handle nested Result types from query execution.
rust-executor/src/prolog_service/mod.rs Modified test assertions' error message strings from "Error running query" to "no error running query"; no logic changes.
rust-executor/src/prolog_service/engine.rs (tests) Added explicit import of tokio; no functional changes.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant EnginePool
    participant PrologEngine
    participant WorkerThread

    Client->>EnginePool: run_query(query)
    EnginePool->>PrologEngine: run_query(query)
    PrologEngine->>WorkerThread: Send request via mpsc channel
    WorkerThread-->>PrologEngine: Process query, collect results (limit 1M, catch panics)
    PrologEngine-->>EnginePool: Return QueryResult (or Error)
    EnginePool-->>Client: Return QueryResult or handle error (invalidate engine if needed)
Loading

Poem

In the warren of code, we’ve dug anew,
Async to sync, our channels grew.
Panics caught, results in bounds,
Errors handled, safety surrounds.
Engines now rest on mutex beds,
As rabbits rejoice, hopping ahead!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
rust-executor/src/prolog_service/engine_pool.rs (1)

98-116: 🛠️ Refactor suggestion

⚠️ Potential issue

Release the read-lock before awaiting or upgrading to a write-lock to avoid dead-locks

run_query acquires an RwLockReadGuard (engines) and keeps it alive until the end of the function.
When an error occurs we delegate to handle_engine_error, which tries to acquire a write lock on the same RwLock.
Because the read-guard is still held, the task will hang forever – Tokio’s RwLock is not re-entrant and cannot upgrade from read to write.

In addition, the read-guard is also held across the potentially long-running await of engine.run_query, blocking every other writer in the system.

A minimal fix is to drop the guard once we have extracted the engine slot index:

- let engines = self.engines.read().await;
+ let engines_guard = self.engines.read().await;

 ...
- let (engine_idx, engine) = valid_engines[idx];
+ let (engine_idx, engine) = valid_engines[idx];
+
+ // We no longer need the lock; release it before the await below
+ drop(engines_guard);

That single drop():

  1. Eliminates the dead-lock when handle_engine_error tries to write.
  2. Frees other tasks to write / invalidate engines while the query is executing.

Without this change the pool will hang on the first engine panic under load.

Also applies to: 124-139

🧹 Nitpick comments (3)
rust-executor/src/prolog_service/engine_pool.rs (1)

90-97: Minor: include engine index in the log and consider returning the original error type

Nice centralisation of the failure path!
Two micro suggestions:

  • Logging the engine_idx helps correlate with other metrics (memory, CPU, etc.).
  • handle_engine_error boxes the original error into anyhow!, losing its concrete type. Forwarding the Error unchanged lets callers down-cast if they want.

Both are optional, but improve observability and debuggability.

rust-executor/src/prolog_service/engine.rs (2)

55-63: Hold the Mutex only while receiving – prevent accidental starvation

The current pattern locks the Receiver once, then holds the mutex for the entire lifetime of the service loop:

let receiver = receiver.0.lock().unwrap(); // lock never released
while let Ok(message) = receiver.recv() { ... }

If additional threads are ever introduced that also need to recv, they will starve forever.
A safer pattern is:

-loop {
-    let message = {
-        let receiver_guard = receiver.0.lock().unwrap();
-        receiver_guard.recv()
-    };
-    match message {
+loop {
+    let message = {
+        let guard = receiver.0.lock().unwrap();
+        guard.recv()
+    };
+    match message {
         ...
     }
 }

This keeps the critical section minimal and future-proofs the code.


67-113: Potential OOM risk when collecting up to 1 000 000 answers

Vec::with_capacity(MAX_RESULTS) is not used, so the vector will realloc several times and may still attempt to store a million possibly large Terms, exhausting memory.

Consider streaming results back to the caller or switching to a chunked protocol (e.g. send every N answers) to bound peak memory usage. If a hard cap is still required, pre-allocate and early-return when the limit is hit to avoid extra pushes.

Not critical for correctness, but worth revisiting if queries may return large payloads.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c8d3640 and 395a62a.

📒 Files selected for processing (2)
  • rust-executor/src/prolog_service/engine.rs (5 hunks)
  • rust-executor/src/prolog_service/engine_pool.rs (2 hunks)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
rust-executor/src/prolog_service/engine.rs (3)

27-29: Consider documenting the SendableReceiver wrapper and its safety implications.

The new SendableReceiver wrapper is crucial for thread safety, but lacks documentation explaining its purpose and safety guarantees. The unsafe Sync implementation needs justification.

+/// Wrapper around mpsc::Receiver that makes it sharable between threads.
+/// This is safe because we protect the receiver with a mutex.
 struct SendableReceiver<T>(Arc<Mutex<mpsc::Receiver<T>>>);

+/// Safe because the inner Receiver is protected by a Mutex
 unsafe impl<T> Sync for SendableReceiver<T> {}

67-67: Consider making MAX_RESULTS configurable.

The hard-coded limit of 1,000,000 results might not be appropriate for all use cases. Consider making this configurable through a parameter or environment variable.

-            const MAX_RESULTS: usize = 1_000_000; // Adjust as needed
+            // Get limit from config or use default
+            const DEFAULT_MAX_RESULTS: usize = 1_000_000;
+            let max_results = std::env::var("PROLOG_MAX_RESULTS")
+                .ok()
+                .and_then(|s| s.parse::<usize>().ok())
+                .unwrap_or(DEFAULT_MAX_RESULTS);

118-118: Result sending should handle potential channel errors.

The code uses let _ = response.send(...) to silently ignore send errors, which occurs when the receiver has been dropped. While this approach works, logging these occurrences could provide valuable diagnostic information.

-                                let _ = response.send(PrologServiceResponse::QueryResult(result));
+                                if let Err(_) = response.send(PrologServiceResponse::QueryResult(result)) {
+                                    log::debug!("Failed to send query result - receiver dropped");
+                                }

Also applies to: 156-156

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 395a62a and f0f0ab7.

📒 Files selected for processing (2)
  • rust-executor/src/prolog_service/engine.rs (5 hunks)
  • rust-executor/src/prolog_service/engine_pool.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • rust-executor/src/prolog_service/engine_pool.rs
🔇 Additional comments (8)
rust-executor/src/prolog_service/engine.rs (8)

2-4: Well-structured import changes to accommodate the new implementation.

The imports have been properly updated to reflect the change from Tokio's async channels to standard library's synchronous mpsc channels, along with the necessary thread synchronization primitives (Arc, Mutex).

Also applies to: 7-8


14-15: Properly updated request enum to use standard library channels.

The PrologServiceRequest enum has been correctly modified to use synchronous mpsc senders for responses instead of oneshot channels, aligning with the PR's objective to remove Tokio runtime dependencies.


32-33: Updated PrologEngine fields for thread-safe synchronous communication.

The engine's field types have been correctly updated to use standard mpsc channels, and the constructor properly wraps the receiver in the new SendableReceiver type with appropriate thread safety primitives.

Also applies to: 38-42


47-60: Good use of standard threads for Prolog execution instead of Tokio tasks.

The spawn method now correctly uses standard threads instead of Tokio tasks for Prolog execution, which aligns with the PR objective to reduce contention on Tokio worker threads. The initialization flow with synchronous communication is well-structured.


61-63: Improved message loop with synchronous receiving.

The change from async to synchronous message receiving simplifies the code and removes Tokio dependencies, which is in line with the PR objectives.


64-114: Robust implementation for handling panics during query execution.

This is a critical improvement that addresses the core issue described in the PR objectives. The implementation:

  1. Sets a maximum result limit to prevent allocation failures
  2. Wraps each call to iterator.next() in catch_unwind to handle panics at their source
  3. Adds comprehensive error reporting with query context
  4. Provides warning logs for truncated results

These changes effectively resolve the crashes that occurred during query execution.


163-170: Good use of spawn_blocking for waiting on synchronous responses.

The code correctly uses tokio::task::spawn_blocking to wait for responses from synchronous channels without blocking the Tokio runtime, maintaining the async interface for callers while using synchronous communication internally.

Also applies to: 180-187, 203-210


222-222: Explicit tokio import is good practice.

Adding the explicit import of tokio in the test module is a good practice for code clarity and dependency management.

lucksus added 3 commits April 25, 2025 14:58
only when the engine panics do we need to invalidate it. a failing query is a different error level.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
rust-executor/src/prolog_service/engine.rs (1)

155-156: 🛠️ Refactor suggestion

Add panic handling for module loading

The LoadModuleString handler doesn't have the same panic protection as the query execution. The consult_module_string method could potentially panic as well.

-                        machine.consult_module_string(module_name.as_str(), program);
-                        let _ = response.send(PrologServiceResponse::LoadModuleResult(Ok(())));
+                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
+                            machine.consult_module_string(module_name.as_str(), program.clone());
+                            Ok(())
+                        }));
+                        
+                        match result {
+                            Ok(ok) => {
+                                let _ = response.send(PrologServiceResponse::LoadModuleResult(ok));
+                            },
+                            Err(e) => {
+                                let error_string = if let Some(string) = e.downcast_ref::<String>() {
+                                    format!("Scryer panicked with: {:?} - when loading module: {}", string, module_name)
+                                } else if let Some(&str) = e.downcast_ref::<&str>() {
+                                    format!("Scryer panicked with: {:?} - when loading module: {}", str, module_name)
+                                } else {
+                                    format!("Scryer panicked with: {:?} - when loading module: {}", e, module_name)
+                                };
+                                log::error!("{}", error_string);
+                                let _ = response.send(PrologServiceResponse::LoadModuleResult(Err(Error::msg(error_string))));
+                            }
+                        }
🧹 Nitpick comments (1)
rust-executor/src/prolog_service/engine.rs (1)

67-67: Consider making the maximum result limit configurable

The maximum result limit is hard-coded to 1,000,000. For better flexibility across different system configurations and memory constraints, consider making this configurable either through a constant at the module level or as a parameter.

-                            const MAX_RESULTS: usize = 1_000_000; // Adjust as needed
+                            const MAX_RESULTS: usize = 1_000_000; // Default limit, can be overridden

You could define a configurable constant at the module level:

pub const DEFAULT_MAX_QUERY_RESULTS: usize = 1_000_000;

And then reference it in the implementation.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9de0944 and bf5ccb6.

📒 Files selected for processing (3)
  • rust-executor/src/prolog_service/engine.rs (4 hunks)
  • rust-executor/src/prolog_service/engine_pool.rs (1 hunks)
  • rust-executor/src/prolog_service/mod.rs (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • rust-executor/src/prolog_service/mod.rs
🔇 Additional comments (7)
rust-executor/src/prolog_service/engine_pool.rs (3)

90-101: Great addition of a centralized error handler!

This new handle_engine_error method properly centralizes error handling logic for Prolog engine failures, improving code organization and maintainability. It appropriately logs errors, invalidates failed engines, and returns a clear error message.


103-130: Clean refactoring of engine selection and query preprocessing

The updated run_query method now properly scopes the engine selection and query preprocessing logic, providing better organization with explicit destructuring of results and engine index. This change aligns well with the PR's goal of improving robustness under high-load conditions.


131-150: Clear distinction between engine failures and query failures

The updated match statement explicitly categorizes the different result states (engine panic, query failure, or success), which provides better error handling and makes it clear how each case is processed. This change directly addresses the PR objective of handling panics properly during query execution.

rust-executor/src/prolog_service/engine.rs (4)

2-9: Good replacement of Tokio channels with standard library synchronous channels

The switch from Tokio's asynchronous channels to standard library synchronous channels is well-implemented. The custom SendableReceiver with proper thread-safety guarantees ensures correct cross-thread communication while reducing overhead and contention, directly achieving the PR objective.

Also applies to: 27-34


46-61: Effective transition from Tokio runtime to dedicated thread

The spawn method now properly uses a dedicated thread instead of Tokio's runtime, which reduces overhead and contention on Tokio worker threads as mentioned in the PR objectives. The transition to blocking operations is well-implemented.


64-114: Robust panic handling implementation during iteration

The query execution has been greatly improved with:

  1. A manual loop instead of collect() that catches panics on each iterator step
  2. A maximum result limit of 1,000,000 to prevent allocation failures
  3. Proper termination of the loop on panic detection
  4. Detailed error reporting with query details

This directly addresses the PR's core objective of fixing crashes by catching panics during iteration.


180-193: Well-implemented asynchronous wrapper around synchronous communication

The run_query and load_module_string methods now properly spawn blocking tasks to handle the synchronous communication, maintaining the async interface while benefiting from the simpler synchronous channel implementation. This is a good approach that preserves the API while improving the internal implementation.

Also applies to: 200-216

@lucksus lucksus merged commit 5236a4e into dev Apr 25, 2025
3 checks passed
@lucksus lucksus deleted the feature/fix-crashes-with-all-plugins branch August 22, 2025 12:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant