Skip to content

feat(plugin): add tool call anomaly detection plugin#3846

Open
anujshrivastava15 wants to merge 1 commit intoIBM:mainfrom
anujshrivastava15:feat/3845-tool-call-anomaly-detection
Open

feat(plugin): add tool call anomaly detection plugin#3846
anujshrivastava15 wants to merge 1 commit intoIBM:mainfrom
anujshrivastava15:feat/3845-tool-call-anomaly-detection

Conversation

@anujshrivastava15
Copy link

🧱 New Plugin

🔗 Closes

Closes #3845

🚀 Summary

Add a tool call anomaly detection plugin that learns per-user tool-calling baselines and flags behavioral deviations: burst invocations, novel tool access, unusual frequency patterns, and off-hours activity. Runs on tool_pre_invoke and tool_post_invoke hooks with zero external dependencies.

🧪 Checks

  • Plugin was bootstrapped with the CLI (native or external template)
  • Unit tests created for the new plugin
  • make lint plugins passes
  • make test passes
  • CHANGELOG updated (if user-facing)
  • README documentation was created for the plugin
  • New plugin added to the documentation linking to the README above

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new security-focused plugin to detect anomalous MCP tool-calling behavior by learning per-user baselines and scoring deviations, and wires it into the default plugin configuration alongside unit tests and plugin documentation.

Changes:

  • Introduces ToolCallAnomalyDetectionPlugin with per-user baseline learning and anomaly scoring on tool_pre_invoke, plus metadata enrichment on tool_post_invoke.
  • Adds unit tests for learning/detection behavior, identity extraction, and pruning.
  • Adds plugin manifest/README and registers the plugin in plugins/config.yaml (disabled by default).

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
plugins/tool_call_anomaly_detection/tool_call_anomaly_detection.py New plugin implementation for baseline learning + anomaly scoring and (optional) blocking.
tests/unit/plugins/test_tool_call_anomaly_detection.py Unit tests covering learning vs detection, burst/novelty/off-hours, identity extraction, pruning.
plugins/tool_call_anomaly_detection/plugin-manifest.yaml Plugin manifest with hooks and default configuration.
plugins/tool_call_anomaly_detection/README.md Plugin usage/config documentation and described signals/metadata.
plugins/config.yaml Registers the new plugin in the default plugin list (disabled).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +133 to +156
self._cfg = AnomalyDetectionConfig(**(config.config or {}))
self._baselines: Dict[str, _UserBaseline] = {}

# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

def _get_user_id(self, context: PluginContext) -> str:
gc = context.global_context
if isinstance(gc.user, dict):
return gc.user.get("email", gc.user.get("sub", "anonymous"))
return gc.user or "anonymous"

def _get_baseline(self, user_id: str) -> _UserBaseline:
if user_id not in self._baselines:
self._baselines[user_id] = _UserBaseline()
return self._baselines[user_id]

def _is_learning(self, baseline: _UserBaseline) -> bool:
return (time.time() - baseline.first_seen) < self._cfg.learning_window_seconds

def _prune_history(self, baseline: _UserBaseline) -> None:
if len(baseline.call_history) > self._cfg.max_history_per_user:
baseline.call_history = baseline.call_history[-self._cfg.max_history_per_user:]
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-user baseline memory is only bounded for call_history; known_tools, known_arg_signatures, tool_counts, and even _baselines itself can grow without limit as new users/tools/arg-shapes appear. This can lead to unbounded memory growth under long-running workloads or adversarial tool names. Consider adding eviction (TTL/LRU), maximum tracked users, and/or capping per-user distinct tools/arg signatures when max_history_per_user is exceeded.

Copilot uses AI. Check for mistakes.
Comment on lines +9 to +10
- `tool_pre_invoke` — Scores the incoming call against the user's baseline; warns or blocks above threshold
- `tool_post_invoke` — Records the call into the baseline and enriches response metadata with risk scores
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README states tool_post_invoke "records the call into the baseline", but the implementation only reads anomaly_meta from context and returns it; baseline recording happens in tool_pre_invoke. Please update the README to match actual behavior, or update tool_post_invoke to perform the recording if that's the intended design.

Suggested change
- `tool_pre_invoke` — Scores the incoming call against the user's baseline; warns or blocks above threshold
- `tool_post_invoke`Records the call into the baseline and enriches response metadata with risk scores
- `tool_pre_invoke` — Scores the incoming call against the user's baseline, updates the baseline, and may warn or block above threshold
- `tool_post_invoke`Reads anomaly metadata from context and enriches the tool response with risk scores and related fields

Copilot uses AI. Check for mistakes.
Comment on lines +59 to +62
- `anomaly_signals` — Dict of individual signal scores (novelty, burst, frequency)
- `anomaly_off_hours` — Whether off-hours bonus was applied
- `anomaly_action` — Action taken (allow / warn / block)

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README lists metadata fields (anomaly_signals, anomaly_action) that the plugin never sets, and the plugin instead emits separate keys (anomaly_novelty, anomaly_burst, anomaly_frequency, etc.). Please update the README to match the emitted metadata, or update the plugin to emit the documented fields.

Suggested change
- `anomaly_signals` — Dict of individual signal scores (novelty, burst, frequency)
- `anomaly_off_hours` — Whether off-hours bonus was applied
- `anomaly_action` — Action taken (allow / warn / block)
- `anomaly_novelty` — Novelty signal score contributing to the composite risk
- `anomaly_burst` — Burst-rate signal score contributing to the composite risk
- `anomaly_frequency` — Frequency-pattern signal score contributing to the composite risk
- `anomaly_off_hours` — Whether an off-hours bonus was applied when scoring

Copilot uses AI. Check for mistakes.
Comment on lines +1062 to +1065
tags: ["security", "anomaly-detection", "behavioral", "audit"]
mode: "disabled" # set to "permissive" to observe, "enforce" to block
priority: 201 # Run after telemetry exporter, before late-stage plugins
conditions: []
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration comment implies that setting mode: "enforce" is sufficient to block, but the plugin only blocks when action is set to "block" and the framework mode is enforce (permissive mode will log and continue). Please clarify the comment (and/or set action: "block" in the enforce example) to avoid misconfiguration.

Copilot uses AI. Check for mistakes.
Comment on lines +193 to +213
async def test_off_hours_bonus(self):
plugin = ToolCallAnomalyDetectionPlugin(
_make_config(
learning_window_seconds=0,
off_hours_start=0,
off_hours_end=23, # always off-hours
off_hours_score_bonus=0.15,
)
)
baseline = plugin._get_baseline("alice@example.com")
baseline.first_seen = time.time() - 7200
baseline.known_tools.add("db_query")
baseline.known_arg_signatures["db_query"].add(frozenset(["query"]))
baseline.tool_counts["db_query"] = 50

ctx = _make_context()
result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx)

assert result.metadata["anomaly_off_hours"] is True


Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_off_hours_bonus depends on the real current UTC hour (_is_off_hours() uses datetime.now(timezone.utc)), so it will fail when tests run at 23:xx UTC because off_hours_end=23 makes hour 23 not off-hours. Please mock the clock (e.g., patch datetime.datetime.now or refactor the plugin to use an injectable time source) so the test is deterministic.

Copilot uses AI. Check for mistakes.
Comment on lines +259 to +283
# Always record (keeps baseline fresh)
self._record_call(user_id, tool_name, arg_keys, now)

meta: Dict[str, Any] = {
"anomaly_risk_score": round(risk_score, 4),
"anomaly_novelty": round(novelty, 4),
"anomaly_burst": round(burst, 4),
"anomaly_frequency": round(frequency, 4),
"anomaly_off_hours": self._is_off_hours(),
"anomaly_user": user_id,
"anomaly_tool": tool_name,
}

# Save for post_invoke enrichment
context.set_state("anomaly_meta", meta)
context.set_state("anomaly_risk_score", risk_score)

if risk_score >= self._cfg.block_threshold and self._cfg.action == "block":
logger.warning(
"Anomaly detection: blocking tool call %s for user %s (risk=%.2f)",
tool_name,
user_id,
risk_score,
)
return ToolPreInvokeResult(
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin records the call into the baseline (_record_call) before deciding to block. If a call is blocked, adding it to known_tools/signatures can reduce novelty risk for subsequent attempts and effectively lets a blocked attempt “train” the baseline. Consider recording only after allow/warn, or recording blocked calls separately without promoting them into the baseline used for novelty scoring.

Copilot uses AI. Check for mistakes.
Comment on lines +22 to +25
import math
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

math and several typing imports (Optional, Tuple) are unused in this module and will fail linting. Please remove unused imports or use them where intended.

Suggested change
import math
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple
import time
from collections import defaultdict
from typing import Any, Dict, List, Set

Copilot uses AI. Check for mistakes.
### Anomaly Signals
- **Novelty** — Tool name never seen in the user's history
- **Burst** — Call rate exceeds threshold within the sliding window
- **Frequency** — Tool called significantly more than its historical average
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "Frequency" signal description says "called significantly more than its historical average", but _score_frequency flags tools that are rarely used (tool_fraction < 0.01). Please align the README wording with the implemented heuristic or adjust the scoring logic to match the documented behavior.

Suggested change
- **Frequency** — Tool called significantly more than its historical average
- **Frequency** — Tool called significantly less than its historical average (unusually rare usage)

Copilot uses AI. Check for mistakes.
Comment on lines +13 to +28
from unittest.mock import patch

import pytest

from mcpgateway.plugins.framework import (
GlobalContext,
PluginConfig,
PluginContext,
ToolPostInvokePayload,
ToolPreInvokePayload,
)
from mcpgateway.plugins.framework.hooks.tools import ToolHookType
from plugins.tool_call_anomaly_detection.tool_call_anomaly_detection import (
AnomalyDetectionConfig,
ToolCallAnomalyDetectionPlugin,
)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

patch and AnomalyDetectionConfig are imported but never used in this test module, which will fail linting. Please remove unused imports or use patch to make the time-based tests deterministic (see off-hours test below).

Copilot uses AI. Check for mistakes.
Add a new security plugin that learns per-user tool-calling baselines
and flags behavioral anomalies: burst invocations, novel tool access,
unusual frequency patterns, and off-hours activity.

Closes IBM#3845

Signed-off-by: Anuj Shrivastava <ashrivastava@ibm.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +71 to +77
block_threshold: float = Field(default=0.8, ge=0.0, le=1.0)
warn_threshold: float = Field(default=0.5, ge=0.0, le=1.0)
max_history_per_user: int = Field(default=1000, ge=10)
off_hours_start: int = Field(default=22, ge=0, le=23)
off_hours_end: int = Field(default=6, ge=0, le=23)
off_hours_score_bonus: float = Field(default=0.15, ge=0.0, le=1.0)
action: str = Field(default="warn") # "warn" | "block"
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

action is a free-form string and there’s no validation that warn_threshold <= block_threshold. A typo (e.g., "blok") would silently disable blocking, and inverted thresholds can lead to confusing behavior. Consider using a Literal["warn","block"]/Enum for action and adding a Pydantic validator to enforce sensible threshold ordering.

Copilot uses AI. Check for mistakes.
Comment on lines +1063 to +1067
mode: "disabled" # set to "permissive" to observe, "enforce" to block
priority: 201 # Run after telemetry exporter, before late-stage plugins
conditions: []
config:
learning_window_seconds: 3600
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config comments imply that setting mode: "enforce" is sufficient to block, but the plugin only returns continue_processing=False when config.action == "block" as well. As written, switching to enforce while leaving action: "warn" will still never block. Consider clarifying this in the inline comment (or simplifying by deriving block/warn solely from mode).

Copilot uses AI. Check for mistakes.
Comment on lines +245 to +252
plugin = ToolCallAnomalyDetectionPlugin(
_make_config(learning_window_seconds=9999, max_history_per_user=10)
)
ctx = _make_context()

for i in range(20):
await plugin.tool_pre_invoke(_pre_payload(f"tool_{i}"), _make_context())

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test_history_pruned, ctx = _make_context() is assigned but never used (the loop passes a fresh _make_context() each time). Removing the unused variable avoids lint warnings and keeps the test focused.

Copilot uses AI. Check for mistakes.
Comment on lines +270 to +276
meta: Dict[str, Any] = {
"anomaly_risk_score": round(risk_score, 4),
"anomaly_novelty": round(novelty, 4),
"anomaly_burst": round(burst, 4),
"anomaly_frequency": round(frequency, 4),
"anomaly_off_hours": self._is_off_hours(),
"anomaly_user": user_id,
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_is_off_hours() is called twice for the same invocation (once in _composite_score and again when populating meta). Besides extra work, this can theoretically yield inconsistent anomaly_off_hours vs the score at an hour boundary. Compute the off-hours boolean once per call and reuse it for both scoring and metadata.

Copilot uses AI. Check for mistakes.
Comment on lines +141 to +152
def _get_user_id(self, context: PluginContext) -> str:
"""Extract user identifier from the plugin context."""
gc = context.global_context
if isinstance(gc.user, dict):
return gc.user.get("email", gc.user.get("sub", "anonymous"))
return gc.user or "anonymous"

def _get_baseline(self, user_id: str) -> _UserBaseline:
"""Return the baseline for *user_id*, creating one if needed."""
if user_id not in self._baselines:
self._baselines[user_id] = _UserBaseline()
return self._baselines[user_id]
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Baselines are keyed only by user_id and stored on the plugin instance, which is shared across requests. In a multi-tenant deployment this can mix behavior across tenants (same email/sub in different tenants), and concurrent requests for the same user can interleave updates to _baselines/_UserBaseline without synchronization. Consider scoping the baseline key by (tenant_id, user_id) (and possibly server_id) and using an asyncio.Lock (global or per-user) around baseline mutations.

Copilot uses AI. Check for mistakes.
Comment on lines +194 to +211
plugin = ToolCallAnomalyDetectionPlugin(
_make_config(
learning_window_seconds=0,
off_hours_start=0,
off_hours_end=23, # always off-hours
off_hours_score_bonus=0.15,
)
)
baseline = plugin._get_baseline("alice@example.com")
baseline.first_seen = time.time() - 7200
baseline.known_tools.add("db_query")
baseline.known_arg_signatures["db_query"].add(frozenset(["query"]))
baseline.tool_counts["db_query"] = 50

ctx = _make_context()
result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx)

assert result.metadata["anomaly_off_hours"] is True
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_off_hours_bonus is time-dependent: off_hours_start=0, off_hours_end=23 is not actually “always off-hours” (UTC hour 23 will be in-hours), so this test can fail depending on when it runs. Make the test deterministic by fixing the current UTC hour (e.g., patching the clock used by _is_off_hours) or by refactoring the plugin to accept an injectable time provider for off-hours checks.

Suggested change
plugin = ToolCallAnomalyDetectionPlugin(
_make_config(
learning_window_seconds=0,
off_hours_start=0,
off_hours_end=23, # always off-hours
off_hours_score_bonus=0.15,
)
)
baseline = plugin._get_baseline("alice@example.com")
baseline.first_seen = time.time() - 7200
baseline.known_tools.add("db_query")
baseline.known_arg_signatures["db_query"].add(frozenset(["query"]))
baseline.tool_counts["db_query"] = 50
ctx = _make_context()
result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx)
assert result.metadata["anomaly_off_hours"] is True
# Patch the off-hours check to make the test independent of the actual current time.
with patch.object(ToolCallAnomalyDetectionPlugin, "_is_off_hours", return_value=True):
plugin = ToolCallAnomalyDetectionPlugin(
_make_config(
learning_window_seconds=0,
off_hours_start=0,
off_hours_end=23, # configuration under test
off_hours_score_bonus=0.15,
)
)
baseline = plugin._get_baseline("alice@example.com")
baseline.first_seen = time.time() - 7200
baseline.known_tools.add("db_query")
baseline.known_arg_signatures["db_query"].add(frozenset(["query"]))
baseline.tool_counts["db_query"] = 50
ctx = _make_context()
result = await plugin.tool_pre_invoke(_pre_payload("db_query", {"query": "x"}), ctx)
assert result.metadata["anomaly_off_hours"] is True

Copilot uses AI. Check for mistakes.
Comment on lines +267 to +271
# Always record (keeps baseline fresh)
self._record_call(user_id, tool_name, arg_keys, now)

meta: Dict[str, Any] = {
"anomaly_risk_score": round(risk_score, 4),
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call recording happens unconditionally before the block decision, so even calls that get blocked will be added to the user’s baseline. This can “teach” the baseline about malicious activity and reduce detection quality over time. Consider only recording after deciding to allow/warn, and skip recording when returning continue_processing=False.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE]: Tool call anomaly detection plugin

2 participants