Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions .claude-plugin/marketplace.json

This file was deleted.

10 changes: 5 additions & 5 deletions lmms_eval/caching/fs_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
"""

import os
import subprocess
import shutil
import subprocess
from enum import Enum
from typing import Optional

Expand Down Expand Up @@ -171,16 +171,16 @@ def detect_fs_type(path: str) -> FsType:

def find_local_scratch(min_free_gb: float = 1.0) -> Optional[str]:
"""Find a suitable local fast-storage directory for cache scratch space.

Priority order:
1. ``$LMMS_LOCAL_CACHE`` environment variable (explicit user override)
2. ``/local/scratch`` (common HPC convention)
3. ``/scratch`` (another common convention)
4. ``/tmp`` (always available, but may be tmpfs / size-limited)

A candidate is accepted only if it is writable and has at least
``min_free_gb`` GB of free space (default 1 GB).

Returns the first usable directory found, or ``None`` if none qualify.
"""
candidates = []
Expand All @@ -200,7 +200,7 @@ def find_local_scratch(min_free_gb: float = 1.0) -> Optional[str]:
if os.path.isdir(path) and os.access(path, os.W_OK):
try:
usage = shutil.disk_usage(path)
free_gb = usage.free / (1024 ** 3)
free_gb = usage.free / (1024**3)
if free_gb < min_free_gb:
eval_logger.debug(f"fs_detect: skipping {path} (only {free_gb:.1f} GB free, need {min_free_gb} GB)")
continue
Expand Down
33 changes: 23 additions & 10 deletions lmms_eval/caching/response_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import json
import os
import sqlite3
import urllib.parse
import time
import urllib.parse
from functools import partial
from typing import Any, Dict, List, Optional, Union

Expand All @@ -50,7 +50,7 @@
}
)

_SCHEMA_VERSION = 2
_SCHEMA_VERSION = 3


def _short_hash(value: str) -> str:
Expand Down Expand Up @@ -212,6 +212,7 @@ def compute_cache_key(
task_fingerprint: str = "",
content_hash: str = "",
model_fingerprint_hash: str = "",
eval_version: str = "",
) -> str:
"""Deterministic SHA-256 cache key for a model response.

Expand All @@ -220,6 +221,7 @@ def compute_cache_key(
requests that share the same (task_name, doc_id, idx).
``task_fingerprint`` enables automatic invalidation on YAML/prompt changes.
``model_fingerprint_hash`` ensures key-level model adapter isolation.
``eval_version`` isolates cache entries across lmms-eval releases.
"""
payload = {
"v": _SCHEMA_VERSION,
Expand All @@ -235,6 +237,8 @@ def compute_cache_key(
payload["tf"] = task_fingerprint
if model_fingerprint_hash:
payload["mfh"] = model_fingerprint_hash
if eval_version:
payload["ev"] = eval_version
data = json.dumps(payload, sort_keys=True, ensure_ascii=True, separators=(",", ":"))
return hashlib.sha256(data.encode("utf-8")).hexdigest()

Expand Down Expand Up @@ -272,12 +276,14 @@ def __init__(
model_fingerprint: str = "",
task_fingerprints: Optional[Dict[str, str]] = None,
shared_db_path: Optional[str] = None,
eval_version: str = "",
):
self.db_path = db_path
self.audit_path = audit_path
self.model_fingerprint = model_fingerprint
self._model_fingerprint_hash = _short_hash(model_fingerprint)
self._task_fingerprints: Dict[str, str] = task_fingerprints or {}
self._eval_version = eval_version

self.db = sqlite3.connect(db_path, timeout=30)
self.db.execute("PRAGMA journal_mode=WAL")
Expand All @@ -289,6 +295,12 @@ def __init__(
if self._model_fingerprint_hash:
self.db.execute("INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)", ("model_fingerprint_hash", self._model_fingerprint_hash))
self.db.execute("INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)", ("schema_version", str(_SCHEMA_VERSION)))
if eval_version:
# Warn if DB was written by a different lmms-eval version
row = self.db.execute("SELECT value FROM meta WHERE key = 'eval_version'").fetchone()
if row and row[0] != eval_version:
eval_logger.warning(f"ResponseCache: DB was last written by lmms-eval {row[0]}, " f"current version is {eval_version}. Cache keys now include version \u2014 " f"old entries will not match (safe, but no reuse).")
self.db.execute("INSERT OR REPLACE INTO meta (key, value) VALUES (?, ?)", ("eval_version", eval_version))
self.db.commit()

# Optional shared (read-only) DB for two-tier caching.
Expand All @@ -311,6 +323,7 @@ def __init__(
self._hits_shared = 0
self._misses = 0
self._skipped = 0

def _replay_audit_log(self) -> None:
"""Replay JSONL entries missing from SQLite (crash recovery)."""
if not os.path.exists(self.audit_path):
Expand Down Expand Up @@ -360,6 +373,7 @@ def _lookup(self, cache_key: str) -> Any:
except Exception:
pass # shared DB failure is non-fatal
return None

def _log_to_audit(
self,
request_type: str,
Expand Down Expand Up @@ -399,6 +413,8 @@ def _log_to_audit(
record["content_hash"] = content_hash
if model_fingerprint_hash:
record["model_fingerprint_hash"] = model_fingerprint_hash
if self._eval_version:
record["eval_version"] = self._eval_version
self._audit_file.write(json.dumps(record, ensure_ascii=False) + "\n")
self._audit_file.flush()
os.fsync(self._audit_file.fileno())
Expand Down Expand Up @@ -470,6 +486,7 @@ def execute(self, lm: Any, reqtype: str, requests: List[Instance]) -> list:
content_hash=ch,
task_fingerprint=tf,
model_fingerprint_hash=self._model_fingerprint_hash,
eval_version=self._eval_version,
)
cached = self._lookup(cache_key)
if cached is not None:
Expand Down Expand Up @@ -503,6 +520,7 @@ def execute(self, lm: Any, reqtype: str, requests: List[Instance]) -> list:
content_hash=ch,
task_fingerprint=tf,
model_fingerprint_hash=self._model_fingerprint_hash,
eval_version=self._eval_version,
)
if deterministic
else ""
Expand Down Expand Up @@ -561,6 +579,7 @@ def close(self) -> None:
self._shared_db = None
except Exception:
pass

def __del__(self):
try:
self.close()
Expand Down Expand Up @@ -665,17 +684,11 @@ def consolidate_cache(
"""
# Merge SQLite shards
merged_entries = ResponseCache.merge_shards(shard_db_paths, target_db_path)
eval_logger.info(
f"ResponseCache: consolidated {merged_entries} entries from "
f"{len(shard_db_paths)} shard(s) into {target_db_path}"
)
eval_logger.info(f"ResponseCache: consolidated {merged_entries} entries from " f"{len(shard_db_paths)} shard(s) into {target_db_path}")

# Merge JSONL audit logs
merged_lines = ResponseCache.merge_audit_logs(shard_audit_paths, target_audit_path)
eval_logger.info(
f"ResponseCache: consolidated {merged_lines} audit entries from "
f"{len(shard_audit_paths)} log(s) into {target_audit_path}"
)
eval_logger.info(f"ResponseCache: consolidated {merged_lines} audit entries from " f"{len(shard_audit_paths)} log(s) into {target_audit_path}")

# Cleanup shard files
if cleanup:
Expand Down
42 changes: 21 additions & 21 deletions lmms_eval/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
get_datetime_str,
get_git_branch_name,
get_git_commit_hash,
get_lmms_eval_cache_version,
get_lmms_eval_version_string,
handle_non_serializable,
hash_string,
Expand Down Expand Up @@ -304,15 +305,19 @@ def _adjust_config(task_dict):
world_size = int(os.environ.get("WORLD_SIZE", 1))

response_cache = None
_cache_target_db = None # final consolidated .db path (user-specified)
_cache_write_db = None # DB path used for writes during evaluation
_cache_target_db = None # final consolidated .db path (user-specified)
_cache_write_db = None # DB path used for writes during evaluation
_cache_write_audit = None
_cache_target_audit = None
_cache_shared_db = None # read-only shared DB for two-tier cache (may be None)
_cache_is_two_tier = False # True when local scratch != target (NFS case)
_cache_staging_dir = None # shared staging dir for multi-node two-tier merges
_cache_shared_db = None # read-only shared DB for two-tier cache (may be None)
_cache_is_two_tier = False # True when local scratch != target (NFS case)
_cache_staging_dir = None # shared staging dir for multi-node two-tier merges
if use_cache is not None:
from lmms_eval.caching.fs_detect import FsType, detect_fs_type, find_local_scratch
from lmms_eval.caching.fs_detect import (
FsType,
detect_fs_type,
find_local_scratch,
)

_FUNC_ADDR_RE = re.compile(r" at 0x[0-9a-fA-F]+>")

Expand Down Expand Up @@ -345,11 +350,7 @@ def _adjust_config(task_dict):
# place cache.db inside it. Otherwise treat as a .db file path.
_cache_target_db = use_cache
if os.path.isdir(_cache_target_db) or _cache_target_db.endswith(os.sep):
eval_logger.warning(
f"ResponseCache: --use_cache received a directory ({_cache_target_db}). "
"In future versions, pass a .db file path directly (e.g. --use_cache ./cache.db). "
"Auto-mapping to cache.db inside the directory."
)
eval_logger.warning(f"ResponseCache: --use_cache received a directory ({_cache_target_db}). " "In future versions, pass a .db file path directly (e.g. --use_cache ./cache.db). " "Auto-mapping to cache.db inside the directory.")
_cache_target_db = os.path.join(_cache_target_db, "cache.db")
elif not _cache_target_db.endswith(".db"):
_cache_target_db = _cache_target_db + ".db"
Expand Down Expand Up @@ -380,10 +381,7 @@ def _adjust_config(task_dict):
else:
_cache_write_db = os.path.join(local_cache_dir, "local.db")
_cache_write_audit = os.path.join(local_cache_dir, "local.audit.jsonl")
eval_logger.info(
f"ResponseCache: two-tier mode - writes to {_cache_write_db}, "
f"reads from local + shared ({_cache_target_db})"
)
eval_logger.info(f"ResponseCache: two-tier mode - writes to {_cache_write_db}, " f"reads from local + shared ({_cache_target_db})")
else:
eval_logger.warning("ResponseCache: target is on remote FS but no local scratch found, writing directly")

Expand All @@ -402,6 +400,7 @@ def _adjust_config(task_dict):
model_fingerprint=model_fp,
task_fingerprints=task_fingerprints,
shared_db_path=_cache_shared_db,
eval_version=get_lmms_eval_cache_version(),
)
eval_logger.info(f"ResponseCache initialized: {_cache_write_db}")

Expand Down Expand Up @@ -430,12 +429,8 @@ def _adjust_config(task_dict):
finally:
if response_cache is not None:
stats = response_cache.get_stats()
shared_info = f", {stats.get('hits_shared', 0)} from shared DB" if stats.get('hits_shared', 0) else ""
eval_logger.info(
f"ResponseCache stats: {stats['hits']} hits{shared_info}, "
f"{stats['misses']} misses, {stats['skipped_non_deterministic']} skipped, "
f"hit rate: {stats['hit_rate']:.1%}"
)
shared_info = f", {stats.get('hits_shared', 0)} from shared DB" if stats.get("hits_shared", 0) else ""
eval_logger.info(f"ResponseCache stats: {stats['hits']} hits{shared_info}, " f"{stats['misses']} misses, {stats['skipped_non_deterministic']} skipped, " f"hit rate: {stats['hit_rate']:.1%}")
response_cache.close()

# Post-eval consolidation: only on success, rank 0 only.
Expand All @@ -444,6 +439,7 @@ def _adjust_config(task_dict):
# Two-tier multi-node: each rank copies its local shard to shared staging.
# Rank 0 does its own copy here; other ranks copy below after barrier.
import shutil

for src in (_cache_write_db, _cache_write_audit):
if os.path.exists(src):
shutil.copy2(src, _cache_staging_dir)
Expand All @@ -454,13 +450,15 @@ def _adjust_config(task_dict):
lm.accelerator.wait_for_everyone()
elif distributed_executor_backend == "torchrun":
import torch.distributed as dist

if dist.is_initialized():
dist.barrier()

# Non-rank-0: copy local shards to staging (two-tier multi-node).
if eval_succeeded and response_cache is not None:
if _cache_is_two_tier and world_size > 1 and _cache_staging_dir and global_rank != 0:
import shutil

for src in (_cache_write_db, _cache_write_audit):
if os.path.exists(src):
shutil.copy2(src, _cache_staging_dir)
Expand All @@ -471,6 +469,7 @@ def _adjust_config(task_dict):
lm.accelerator.wait_for_everyone()
elif distributed_executor_backend == "torchrun":
import torch.distributed as dist

if dist.is_initialized():
dist.barrier()

Expand All @@ -494,6 +493,7 @@ def _adjust_config(task_dict):
# Clean up staging dir.
if _cache_staging_dir and os.path.isdir(_cache_staging_dir):
import shutil

shutil.rmtree(_cache_staging_dir, ignore_errors=True)
elif world_size > 1:
# Direct mode, multi-GPU: merge per-rank shards into target.
Expand Down
18 changes: 18 additions & 0 deletions lmms_eval/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,24 @@ def get_lmms_eval_version_string():
return f"{branch}@{commit[:8]}"


def get_lmms_eval_cache_version() -> str:
"""Return a version string for cache key isolation.

For dev/editable installs the git commit hash is preferred because the
PyPI version stays constant while the code changes. For pip installs
(no git repo) we fall back to ``importlib.metadata.version``.
"""
commit = get_git_commit_hash()
if commit:
return commit
try:
import importlib.metadata

return importlib.metadata.version("lmms-eval")
except Exception:
return "unknown"


# ---------------------------------------------------------------------------
# Evaluation banner: printed above the results table
# ---------------------------------------------------------------------------
Expand Down