feat(plugin): add tool call anomaly detection plugin#3846
feat(plugin): add tool call anomaly detection plugin#3846anujshrivastava15 wants to merge 1 commit intoIBM:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new security-focused plugin to detect anomalous MCP tool-calling behavior by learning per-user baselines and scoring deviations, and wires it into the default plugin configuration alongside unit tests and plugin documentation.
Changes:
- Introduces
ToolCallAnomalyDetectionPluginwith per-user baseline learning and anomaly scoring ontool_pre_invoke, plus metadata enrichment ontool_post_invoke. - Adds unit tests for learning/detection behavior, identity extraction, and pruning.
- Adds plugin manifest/README and registers the plugin in
plugins/config.yaml(disabled by default).
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
plugins/tool_call_anomaly_detection/tool_call_anomaly_detection.py |
New plugin implementation for baseline learning + anomaly scoring and (optional) blocking. |
tests/unit/plugins/test_tool_call_anomaly_detection.py |
Unit tests covering learning vs detection, burst/novelty/off-hours, identity extraction, pruning. |
plugins/tool_call_anomaly_detection/plugin-manifest.yaml |
Plugin manifest with hooks and default configuration. |
plugins/tool_call_anomaly_detection/README.md |
Plugin usage/config documentation and described signals/metadata. |
plugins/config.yaml |
Registers the new plugin in the default plugin list (disabled). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._cfg = AnomalyDetectionConfig(**(config.config or {})) | ||
| self._baselines: Dict[str, _UserBaseline] = {} | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Helpers | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def _get_user_id(self, context: PluginContext) -> str: | ||
| gc = context.global_context | ||
| if isinstance(gc.user, dict): | ||
| return gc.user.get("email", gc.user.get("sub", "anonymous")) | ||
| return gc.user or "anonymous" | ||
|
|
||
| def _get_baseline(self, user_id: str) -> _UserBaseline: | ||
| if user_id not in self._baselines: | ||
| self._baselines[user_id] = _UserBaseline() | ||
| return self._baselines[user_id] | ||
|
|
||
| def _is_learning(self, baseline: _UserBaseline) -> bool: | ||
| return (time.time() - baseline.first_seen) < self._cfg.learning_window_seconds | ||
|
|
||
| def _prune_history(self, baseline: _UserBaseline) -> None: | ||
| if len(baseline.call_history) > self._cfg.max_history_per_user: | ||
| baseline.call_history = baseline.call_history[-self._cfg.max_history_per_user:] |
There was a problem hiding this comment.
Per-user baseline memory is only bounded for call_history; known_tools, known_arg_signatures, tool_counts, and even _baselines itself can grow without limit as new users/tools/arg-shapes appear. This can lead to unbounded memory growth under long-running workloads or adversarial tool names. Consider adding eviction (TTL/LRU), maximum tracked users, and/or capping per-user distinct tools/arg signatures when max_history_per_user is exceeded.
| - `tool_pre_invoke` — Scores the incoming call against the user's baseline; warns or blocks above threshold | ||
| - `tool_post_invoke` — Records the call into the baseline and enriches response metadata with risk scores |
There was a problem hiding this comment.
The README states tool_post_invoke "records the call into the baseline", but the implementation only reads anomaly_meta from context and returns it; baseline recording happens in tool_pre_invoke. Please update the README to match actual behavior, or update tool_post_invoke to perform the recording if that's the intended design.
| - `tool_pre_invoke` — Scores the incoming call against the user's baseline; warns or blocks above threshold | |
| - `tool_post_invoke` — Records the call into the baseline and enriches response metadata with risk scores | |
| - `tool_pre_invoke` — Scores the incoming call against the user's baseline, updates the baseline, and may warn or block above threshold | |
| - `tool_post_invoke` — Reads anomaly metadata from context and enriches the tool response with risk scores and related fields |
| - `anomaly_signals` — Dict of individual signal scores (novelty, burst, frequency) | ||
| - `anomaly_off_hours` — Whether off-hours bonus was applied | ||
| - `anomaly_action` — Action taken (allow / warn / block) | ||
|
|
There was a problem hiding this comment.
The README lists metadata fields (anomaly_signals, anomaly_action) that the plugin never sets, and the plugin instead emits separate keys (anomaly_novelty, anomaly_burst, anomaly_frequency, etc.). Please update the README to match the emitted metadata, or update the plugin to emit the documented fields.
| - `anomaly_signals` — Dict of individual signal scores (novelty, burst, frequency) | |
| - `anomaly_off_hours` — Whether off-hours bonus was applied | |
| - `anomaly_action` — Action taken (allow / warn / block) | |
| - `anomaly_novelty` — Novelty signal score contributing to the composite risk | |
| - `anomaly_burst` — Burst-rate signal score contributing to the composite risk | |
| - `anomaly_frequency` — Frequency-pattern signal score contributing to the composite risk | |
| - `anomaly_off_hours` — Whether an off-hours bonus was applied when scoring |
| tags: ["security", "anomaly-detection", "behavioral", "audit"] | ||
| mode: "disabled" # set to "permissive" to observe, "enforce" to block | ||
| priority: 201 # Run after telemetry exporter, before late-stage plugins | ||
| conditions: [] |
There was a problem hiding this comment.
This configuration comment implies that setting mode: "enforce" is sufficient to block, but the plugin only blocks when action is set to "block" and the framework mode is enforce (permissive mode will log and continue). Please clarify the comment (and/or set action: "block" in the enforce example) to avoid misconfiguration.
| async def test_off_hours_bonus(self): | ||
| plugin = ToolCallAnomalyDetectionPlugin( | ||
| _make_config( | ||
| learning_window_seconds=0, | ||
| off_hours_start=0, | ||
| off_hours_end=23, # always off-hours | ||
| off_hours_score_bonus=0.15, | ||
| ) | ||
| ) | ||
| baseline = plugin._get_baseline("alice@example.com") | ||
| baseline.first_seen = time.time() - 7200 | ||
| baseline.known_tools.add("db_query") | ||
| baseline.known_arg_signatures["db_query"].add(frozenset(["query"])) | ||
| baseline.tool_counts["db_query"] = 50 | ||
|
|
||
| ctx = _make_context() | ||
| result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx) | ||
|
|
||
| assert result.metadata["anomaly_off_hours"] is True | ||
|
|
||
|
|
There was a problem hiding this comment.
test_off_hours_bonus depends on the real current UTC hour (_is_off_hours() uses datetime.now(timezone.utc)), so it will fail when tests run at 23:xx UTC because off_hours_end=23 makes hour 23 not off-hours. Please mock the clock (e.g., patch datetime.datetime.now or refactor the plugin to use an injectable time source) so the test is deterministic.
| # Always record (keeps baseline fresh) | ||
| self._record_call(user_id, tool_name, arg_keys, now) | ||
|
|
||
| meta: Dict[str, Any] = { | ||
| "anomaly_risk_score": round(risk_score, 4), | ||
| "anomaly_novelty": round(novelty, 4), | ||
| "anomaly_burst": round(burst, 4), | ||
| "anomaly_frequency": round(frequency, 4), | ||
| "anomaly_off_hours": self._is_off_hours(), | ||
| "anomaly_user": user_id, | ||
| "anomaly_tool": tool_name, | ||
| } | ||
|
|
||
| # Save for post_invoke enrichment | ||
| context.set_state("anomaly_meta", meta) | ||
| context.set_state("anomaly_risk_score", risk_score) | ||
|
|
||
| if risk_score >= self._cfg.block_threshold and self._cfg.action == "block": | ||
| logger.warning( | ||
| "Anomaly detection: blocking tool call %s for user %s (risk=%.2f)", | ||
| tool_name, | ||
| user_id, | ||
| risk_score, | ||
| ) | ||
| return ToolPreInvokeResult( |
There was a problem hiding this comment.
The plugin records the call into the baseline (_record_call) before deciding to block. If a call is blocked, adding it to known_tools/signatures can reduce novelty risk for subsequent attempts and effectively lets a blocked attempt “train” the baseline. Consider recording only after allow/warn, or recording blocked calls separately without promoting them into the baseline used for novelty scoring.
| import math | ||
| import time | ||
| from collections import defaultdict | ||
| from typing import Any, Dict, List, Optional, Set, Tuple |
There was a problem hiding this comment.
math and several typing imports (Optional, Tuple) are unused in this module and will fail linting. Please remove unused imports or use them where intended.
| import math | |
| import time | |
| from collections import defaultdict | |
| from typing import Any, Dict, List, Optional, Set, Tuple | |
| import time | |
| from collections import defaultdict | |
| from typing import Any, Dict, List, Set |
| ### Anomaly Signals | ||
| - **Novelty** — Tool name never seen in the user's history | ||
| - **Burst** — Call rate exceeds threshold within the sliding window | ||
| - **Frequency** — Tool called significantly more than its historical average |
There was a problem hiding this comment.
The "Frequency" signal description says "called significantly more than its historical average", but _score_frequency flags tools that are rarely used (tool_fraction < 0.01). Please align the README wording with the implemented heuristic or adjust the scoring logic to match the documented behavior.
| - **Frequency** — Tool called significantly more than its historical average | |
| - **Frequency** — Tool called significantly less than its historical average (unusually rare usage) |
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
|
|
||
| from mcpgateway.plugins.framework import ( | ||
| GlobalContext, | ||
| PluginConfig, | ||
| PluginContext, | ||
| ToolPostInvokePayload, | ||
| ToolPreInvokePayload, | ||
| ) | ||
| from mcpgateway.plugins.framework.hooks.tools import ToolHookType | ||
| from plugins.tool_call_anomaly_detection.tool_call_anomaly_detection import ( | ||
| AnomalyDetectionConfig, | ||
| ToolCallAnomalyDetectionPlugin, | ||
| ) |
There was a problem hiding this comment.
patch and AnomalyDetectionConfig are imported but never used in this test module, which will fail linting. Please remove unused imports or use patch to make the time-based tests deterministic (see off-hours test below).
Add a new security plugin that learns per-user tool-calling baselines and flags behavioral anomalies: burst invocations, novel tool access, unusual frequency patterns, and off-hours activity. Closes IBM#3845 Signed-off-by: Anuj Shrivastava <ashrivastava@ibm.com>
e84d1a7 to
221f1df
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| block_threshold: float = Field(default=0.8, ge=0.0, le=1.0) | ||
| warn_threshold: float = Field(default=0.5, ge=0.0, le=1.0) | ||
| max_history_per_user: int = Field(default=1000, ge=10) | ||
| off_hours_start: int = Field(default=22, ge=0, le=23) | ||
| off_hours_end: int = Field(default=6, ge=0, le=23) | ||
| off_hours_score_bonus: float = Field(default=0.15, ge=0.0, le=1.0) | ||
| action: str = Field(default="warn") # "warn" | "block" |
There was a problem hiding this comment.
action is a free-form string and there’s no validation that warn_threshold <= block_threshold. A typo (e.g., "blok") would silently disable blocking, and inverted thresholds can lead to confusing behavior. Consider using a Literal["warn","block"]/Enum for action and adding a Pydantic validator to enforce sensible threshold ordering.
| mode: "disabled" # set to "permissive" to observe, "enforce" to block | ||
| priority: 201 # Run after telemetry exporter, before late-stage plugins | ||
| conditions: [] | ||
| config: | ||
| learning_window_seconds: 3600 |
There was a problem hiding this comment.
Config comments imply that setting mode: "enforce" is sufficient to block, but the plugin only returns continue_processing=False when config.action == "block" as well. As written, switching to enforce while leaving action: "warn" will still never block. Consider clarifying this in the inline comment (or simplifying by deriving block/warn solely from mode).
| plugin = ToolCallAnomalyDetectionPlugin( | ||
| _make_config(learning_window_seconds=9999, max_history_per_user=10) | ||
| ) | ||
| ctx = _make_context() | ||
|
|
||
| for i in range(20): | ||
| await plugin.tool_pre_invoke(_pre_payload(f"tool_{i}"), _make_context()) | ||
|
|
There was a problem hiding this comment.
In test_history_pruned, ctx = _make_context() is assigned but never used (the loop passes a fresh _make_context() each time). Removing the unused variable avoids lint warnings and keeps the test focused.
| meta: Dict[str, Any] = { | ||
| "anomaly_risk_score": round(risk_score, 4), | ||
| "anomaly_novelty": round(novelty, 4), | ||
| "anomaly_burst": round(burst, 4), | ||
| "anomaly_frequency": round(frequency, 4), | ||
| "anomaly_off_hours": self._is_off_hours(), | ||
| "anomaly_user": user_id, |
There was a problem hiding this comment.
_is_off_hours() is called twice for the same invocation (once in _composite_score and again when populating meta). Besides extra work, this can theoretically yield inconsistent anomaly_off_hours vs the score at an hour boundary. Compute the off-hours boolean once per call and reuse it for both scoring and metadata.
| def _get_user_id(self, context: PluginContext) -> str: | ||
| """Extract user identifier from the plugin context.""" | ||
| gc = context.global_context | ||
| if isinstance(gc.user, dict): | ||
| return gc.user.get("email", gc.user.get("sub", "anonymous")) | ||
| return gc.user or "anonymous" | ||
|
|
||
| def _get_baseline(self, user_id: str) -> _UserBaseline: | ||
| """Return the baseline for *user_id*, creating one if needed.""" | ||
| if user_id not in self._baselines: | ||
| self._baselines[user_id] = _UserBaseline() | ||
| return self._baselines[user_id] |
There was a problem hiding this comment.
Baselines are keyed only by user_id and stored on the plugin instance, which is shared across requests. In a multi-tenant deployment this can mix behavior across tenants (same email/sub in different tenants), and concurrent requests for the same user can interleave updates to _baselines/_UserBaseline without synchronization. Consider scoping the baseline key by (tenant_id, user_id) (and possibly server_id) and using an asyncio.Lock (global or per-user) around baseline mutations.
| plugin = ToolCallAnomalyDetectionPlugin( | ||
| _make_config( | ||
| learning_window_seconds=0, | ||
| off_hours_start=0, | ||
| off_hours_end=23, # always off-hours | ||
| off_hours_score_bonus=0.15, | ||
| ) | ||
| ) | ||
| baseline = plugin._get_baseline("alice@example.com") | ||
| baseline.first_seen = time.time() - 7200 | ||
| baseline.known_tools.add("db_query") | ||
| baseline.known_arg_signatures["db_query"].add(frozenset(["query"])) | ||
| baseline.tool_counts["db_query"] = 50 | ||
|
|
||
| ctx = _make_context() | ||
| result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx) | ||
|
|
||
| assert result.metadata["anomaly_off_hours"] is True |
There was a problem hiding this comment.
test_off_hours_bonus is time-dependent: off_hours_start=0, off_hours_end=23 is not actually “always off-hours” (UTC hour 23 will be in-hours), so this test can fail depending on when it runs. Make the test deterministic by fixing the current UTC hour (e.g., patching the clock used by _is_off_hours) or by refactoring the plugin to accept an injectable time provider for off-hours checks.
| plugin = ToolCallAnomalyDetectionPlugin( | |
| _make_config( | |
| learning_window_seconds=0, | |
| off_hours_start=0, | |
| off_hours_end=23, # always off-hours | |
| off_hours_score_bonus=0.15, | |
| ) | |
| ) | |
| baseline = plugin._get_baseline("alice@example.com") | |
| baseline.first_seen = time.time() - 7200 | |
| baseline.known_tools.add("db_query") | |
| baseline.known_arg_signatures["db_query"].add(frozenset(["query"])) | |
| baseline.tool_counts["db_query"] = 50 | |
| ctx = _make_context() | |
| result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx) | |
| assert result.metadata["anomaly_off_hours"] is True | |
| # Patch the off-hours check to make the test independent of the actual current time. | |
| with patch.object(ToolCallAnomalyDetectionPlugin, "_is_off_hours", return_value=True): | |
| plugin = ToolCallAnomalyDetectionPlugin( | |
| _make_config( | |
| learning_window_seconds=0, | |
| off_hours_start=0, | |
| off_hours_end=23, # configuration under test | |
| off_hours_score_bonus=0.15, | |
| ) | |
| ) | |
| baseline = plugin._get_baseline("alice@example.com") | |
| baseline.first_seen = time.time() - 7200 | |
| baseline.known_tools.add("db_query") | |
| baseline.known_arg_signatures["db_query"].add(frozenset(["query"])) | |
| baseline.tool_counts["db_query"] = 50 | |
| ctx = _make_context() | |
| result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx) | |
| assert result.metadata["anomaly_off_hours"] is True |
| # Always record (keeps baseline fresh) | ||
| self._record_call(user_id, tool_name, arg_keys, now) | ||
|
|
||
| meta: Dict[str, Any] = { | ||
| "anomaly_risk_score": round(risk_score, 4), |
There was a problem hiding this comment.
Call recording happens unconditionally before the block decision, so even calls that get blocked will be added to the user’s baseline. This can “teach” the baseline about malicious activity and reduce detection quality over time. Consider only recording after deciding to allow/warn, and skip recording when returning continue_processing=False.
🧱 New Plugin
🔗 Closes
Closes #3845
🚀 Summary
Add a tool call anomaly detection plugin that learns per-user tool-calling baselines and flags behavioral deviations: burst invocations, novel tool access, unusual frequency patterns, and off-hours activity. Runs on
tool_pre_invokeandtool_post_invokehooks with zero external dependencies.🧪 Checks
make lint pluginspassesmake testpasses