Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 134 additions & 2 deletions src/fastmcp/server/auth/providers/introspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

import base64
import contextlib
import hashlib
import time
from dataclasses import dataclass
from typing import Any, Literal, get_args

import httpx
Expand All @@ -37,6 +39,15 @@

logger = get_logger(__name__)


@dataclass
class _IntrospectionCacheEntry:
"""Cached introspection result with expiration."""

result: AccessToken
expires_at: float


ClientAuthMethod = Literal["client_secret_basic", "client_secret_post"]


Expand All @@ -60,6 +71,10 @@ class IntrospectionTokenVerifier(TokenVerifier):
- Your tokens require real-time revocation checking
- Your authorization server supports RFC 7662 introspection

Caching is disabled by default to preserve real-time revocation semantics.
Set ``cache_ttl_seconds`` to enable caching and reduce load on the
introspection endpoint (e.g., ``cache_ttl_seconds=300`` for 5 minutes).

Example:
```python
verifier = IntrospectionTokenVerifier(
Expand All @@ -71,6 +86,9 @@ class IntrospectionTokenVerifier(TokenVerifier):
```
"""

# Default cache settings
DEFAULT_MAX_CACHE_SIZE = 10000

def __init__(
self,
*,
Expand All @@ -81,6 +99,8 @@ def __init__(
timeout_seconds: int = 10,
required_scopes: list[str] | None = None,
base_url: AnyHttpUrl | str | None = None,
cache_ttl_seconds: int | None = None,
max_cache_size: int | None = None,
http_client: httpx.AsyncClient | None = None,
):
"""
Expand All @@ -95,6 +115,12 @@ def __init__(
timeout_seconds: HTTP request timeout in seconds (default: 10)
required_scopes: Required scopes for all tokens (optional)
base_url: Base URL for TokenVerifier protocol
cache_ttl_seconds: How long to cache introspection results in seconds.
Caching is disabled by default (None) to preserve real-time
revocation semantics. Set to a positive integer to enable caching
(e.g., 300 for 5 minutes).
max_cache_size: Maximum number of tokens to cache when caching is
enabled. Default: 10000.
http_client: Optional httpx.AsyncClient for connection pooling. When provided,
the client is reused across calls and the caller is responsible for its
lifecycle. When None (default), a fresh client is created per call.
Expand Down Expand Up @@ -128,6 +154,98 @@ def __init__(
self._http_client = http_client
self.logger = get_logger(__name__)

# Cache configuration (None or 0 = disabled)
self._cache_ttl = cache_ttl_seconds or 0
self._max_cache_size = (
max_cache_size
if max_cache_size is not None
else self.DEFAULT_MAX_CACHE_SIZE
)
self._cache: dict[str, _IntrospectionCacheEntry] = {}
self._last_cleanup = time.monotonic()
self._cleanup_interval = 60 # Cleanup every 60 seconds

def _hash_token(self, token: str) -> str:
"""Hash token for use as cache key.

Using SHA-256 for memory efficiency (fixed 64-char hex digest
regardless of token length).
"""
return hashlib.sha256(token.encode("utf-8")).hexdigest()

def _cleanup_expired_cache(self) -> None:
"""Remove expired entries from cache."""
now = time.time()
expired = [key for key, entry in self._cache.items() if entry.expires_at < now]
for key in expired:
del self._cache[key]
if expired:
self.logger.debug("Cleaned up %d expired cache entries", len(expired))

def _maybe_cleanup(self) -> None:
"""Periodically cleanup expired entries to prevent unbounded growth."""
now = time.monotonic()
if now - self._last_cleanup > self._cleanup_interval:
self._cleanup_expired_cache()
self._last_cleanup = now

def _get_cached(self, token: str) -> tuple[bool, AccessToken | None]:
"""Get cached introspection result.

Returns:
Tuple of (is_cached, result):
- (True, AccessToken) if cached valid token
- (False, None) if not in cache or expired
"""
if self._cache_ttl <= 0 or self._max_cache_size <= 0:
return (False, None) # Caching disabled

cache_key = self._hash_token(token)
entry = self._cache.get(cache_key)

if entry is None:
return (False, None) # Not in cache

if entry.expires_at < time.time():
del self._cache[cache_key]
return (False, None) # Expired

# Return a copy to prevent mutations from affecting cached value
return (True, entry.result.model_copy(deep=True))

def _set_cached(self, token: str, result: AccessToken) -> None:
"""Cache a valid introspection result with TTL.

Only successful validations are cached. Failures (inactive, expired,
missing scopes, errors) are never cached to avoid sticky false negatives.
"""
if self._cache_ttl <= 0 or self._max_cache_size <= 0:
return # Caching disabled

# Periodic cleanup
self._maybe_cleanup()

# Check cache size limit
if len(self._cache) >= self._max_cache_size:
self._cleanup_expired_cache()
# If still at limit after cleanup, evict oldest entry
if len(self._cache) >= self._max_cache_size:
oldest_key = next(iter(self._cache))
Comment on lines +229 to +233

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject non-positive max_cache_size values

When caching is enabled and max_cache_size is set to 0 (or any negative value), the size check is immediately true even for an empty cache, so next(iter(self._cache)) raises StopIteration. That exception is then swallowed by the broad except Exception in verify_token, causing otherwise valid introspection responses to return None and reject tokens. Validate max_cache_size as a positive integer (or treat non-positive values as cache-disabled) before entering this eviction path.

Useful? React with 👍 / 👎.

del self._cache[oldest_key]

cache_key = self._hash_token(token)

# Use token's expiration if available and sooner than TTL
expires_at = time.time() + self._cache_ttl
if result.expires_at:
expires_at = min(expires_at, float(result.expires_at))

# Store a deep copy to prevent mutations from affecting cached value
self._cache[cache_key] = _IntrospectionCacheEntry(
result=result.model_copy(deep=True),
expires_at=expires_at,
)

def _create_basic_auth_header(self) -> str:
"""Create HTTP Basic Auth header value from client credentials."""
credentials = f"{self.client_id}:{self.client_secret}"
Expand Down Expand Up @@ -165,12 +283,21 @@ async def verify_token(self, token: str) -> AccessToken | None:
authenticated using the configured client authentication method (client_secret_basic
or client_secret_post).

Results are cached in-memory to reduce load on the introspection endpoint.
Cache TTL and size are configurable via constructor parameters.

Args:
token: The opaque token string to validate

Returns:
AccessToken object if valid and active, None if invalid, inactive, or expired
"""
# Check cache first
is_cached, cached_result = self._get_cached(token)
if is_cached:
self.logger.debug("Token introspection cache hit")
return cached_result

try:
async with (
contextlib.nullcontext(self._http_client)
Expand Down Expand Up @@ -203,7 +330,7 @@ async def verify_token(self, token: str) -> AccessToken | None:
headers=headers,
)

# Check for HTTP errors
# Check for HTTP errors - don't cache HTTP errors (may be transient)
if response.status_code != 200:
self.logger.debug(
"Token introspection failed: HTTP %d - %s",
Expand All @@ -215,6 +342,8 @@ async def verify_token(self, token: str) -> AccessToken | None:
introspection_data = response.json()

# Check if token is active (required field per RFC 7662)
# Don't cache inactive tokens - they may become valid later
# (e.g., tokens with future nbf, or propagation delays)
if not introspection_data.get("active", False):
self.logger.debug("Token introspection returned active=false")
return None
Comment on lines 347 to 349

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid caching every active=false introspection result

This branch caches None for any active=false response, and later calls short-circuit on that cache entry without re-introspecting. In environments where a token can transition from inactive to active (for example, tokens with a future nbf or short-lived authorization-server propagation delays), the verifier will continue rejecting an otherwise valid token for the full cache TTL. Consider not caching active=false results, or at least bounding that cache lifetime by a claim like nbf when present.

Useful? React with 👍 / 👎.

Expand All @@ -239,6 +368,7 @@ async def verify_token(self, token: str) -> AccessToken | None:
scopes = self._extract_scopes(introspection_data)

# Check required scopes
# Don't cache scope failures - permissions may be updated dynamically
if self.required_scopes:
token_scopes = set(scopes)
required_scopes = set(self.required_scopes)
Expand All @@ -251,13 +381,15 @@ async def verify_token(self, token: str) -> AccessToken | None:
return None

# Create AccessToken with introspection response data
return AccessToken(
result = AccessToken(
token=token,
client_id=str(client_id),
scopes=scopes,
expires_at=int(exp) if exp else None,
claims=introspection_data, # Store full response for extensibility
)
self._set_cached(token, result)
return result

except httpx.TimeoutException:
self.logger.debug(
Expand Down
Loading