diff --git a/src/kimi_cli/cli/plugin.py b/src/kimi_cli/cli/plugin.py index 6aae18862..ac57cb39b 100644 --- a/src/kimi_cli/cli/plugin.py +++ b/src/kimi_cli/cli/plugin.py @@ -12,6 +12,50 @@ cli = typer.Typer(help="Manage plugins.") +def _parse_git_url(target: str) -> tuple[str, str | None, str | None]: + """Parse a git URL into (clone_url, subpath, branch). + + Splits .git URLs at the .git boundary. For GitHub/GitLab short URLs, + treats the first two path segments as owner/repo and the rest as subpath. + Strips ``tree/{branch}/`` or ``-/tree/{branch}/`` prefixes from + browser-copied URLs and returns the branch name. + """ + # Path 1: URL contains .git followed by / or end-of-string + idx = target.find(".git/") + if idx == -1 and target.endswith(".git"): + return target, None, None + if idx != -1: + clone_url = target[: idx + 4] # up to and including ".git" + rest = target[idx + 5 :] # after ".git/" + subpath = rest.strip("/") or None + return clone_url, subpath, None + + # Path 2: GitHub/GitLab short URL (no .git) + from urllib.parse import urlparse + + parsed = urlparse(target) + segments = [s for s in parsed.path.split("/") if s] + if len(segments) < 2: + return target, None, None + + owner_repo = "/".join(segments[:2]) + clone_url = f"{parsed.scheme}://{parsed.netloc}/{owner_repo}" + rest_segments = segments[2:] + + # GitLab uses /-/tree/{branch}/, strip leading "-" + if rest_segments and rest_segments[0] == "-": + rest_segments = rest_segments[1:] + + # Strip tree/{branch}/ prefix and extract branch + branch: str | None = None + if len(rest_segments) >= 2 and rest_segments[0] == "tree": + branch = rest_segments[1] + rest_segments = rest_segments[2:] + + subpath = "/".join(rest_segments) or None + return clone_url, subpath, branch + + def _resolve_source(target: str) -> tuple[Path, Path | None]: """Resolve plugin source to (local_dir, tmp_to_cleanup). @@ -23,22 +67,84 @@ def _resolve_source(target: str) -> tuple[Path, Path | None]: # Git URL if target.startswith(("https://", "git@", "http://")) and ( - target.endswith(".git") or "github.com/" in target or "gitlab.com/" in target + ".git/" in target + or target.endswith(".git") + or "github.com/" in target + or "gitlab.com/" in target ): import subprocess + clone_url, subpath, branch = _parse_git_url(target) + tmp = Path(tempfile.mkdtemp(prefix="kimi-plugin-")) - typer.echo(f"Cloning {target}...") + typer.echo(f"Cloning {clone_url}...") + clone_cmd = ["git", "clone", "--depth", "1"] + if branch: + clone_cmd += ["--branch", branch] + clone_cmd += [clone_url, str(tmp / "repo")] result = subprocess.run( - ["git", "clone", "--depth", "1", target, str(tmp / "repo")], + clone_cmd, capture_output=True, text=True, ) if result.returncode != 0: shutil.rmtree(tmp, ignore_errors=True) - typer.echo(f"Error: git clone failed: {result.stderr.strip()}", err=True) + typer.echo( + f"Error: git clone failed: {result.stderr.strip()}", + err=True, + ) raise typer.Exit(1) - return tmp / "repo", tmp + + repo_root = tmp / "repo" + + if subpath: + source = (repo_root / subpath).resolve() + if not source.is_relative_to(repo_root.resolve()): + shutil.rmtree(tmp, ignore_errors=True) + typer.echo( + f"Error: subpath escapes repository: {subpath}", + err=True, + ) + raise typer.Exit(1) + if not source.is_dir(): + shutil.rmtree(tmp, ignore_errors=True) + typer.echo( + f"Error: subpath '{subpath}' not found in repository", + err=True, + ) + raise typer.Exit(1) + if not (source / "plugin.json").exists(): + shutil.rmtree(tmp, ignore_errors=True) + typer.echo( + f"Error: no plugin.json in '{subpath}'", + err=True, + ) + raise typer.Exit(1) + return source, tmp + + # No subpath — check root first + if (repo_root / "plugin.json").exists(): + return repo_root, tmp + + # Scan one level for available plugins + available = sorted( + d.name for d in repo_root.iterdir() if d.is_dir() and (d / "plugin.json").exists() + ) + if available: + names = "\n".join(f" - {n}" for n in available) + typer.echo( + f"Error: No plugin.json at repository root. " + f"Available plugins:\n{names}\n" + f"Use: kimi plugin install /", + err=True, + ) + else: + typer.echo( + "Error: No plugin.json found in repository", + err=True, + ) + shutil.rmtree(tmp, ignore_errors=True) + raise typer.Exit(1) p = Path(target).expanduser().resolve() diff --git a/tests/core/test_plugin.py b/tests/core/test_plugin.py index 35fd22a03..7369ff9f3 100644 --- a/tests/core/test_plugin.py +++ b/tests/core/test_plugin.py @@ -2,9 +2,12 @@ import json from pathlib import Path +from unittest.mock import MagicMock, patch import pytest +import typer +from kimi_cli.cli.plugin import _parse_git_url, _resolve_source from kimi_cli.plugin import ( PluginError, PluginRuntime, @@ -272,3 +275,271 @@ def test_parse_plugin_json_ignores_unknown_fields(tmp_path: Path): ) spec = parse_plugin_json(plugin_dir / "plugin.json") assert spec.name == "p" + + +# --- _parse_git_url tests --- + + +@pytest.mark.parametrize( + "url, expected_clone, expected_subpath, expected_branch", + [ + # .git URLs — no subpath + ("https://host.com/org/repo.git", "https://host.com/org/repo.git", None, None), + ("http://host.com/org/repo.git", "http://host.com/org/repo.git", None, None), + # .git URLs — with subpath + ( + "https://host.com/org/repo.git/my-plugin", + "https://host.com/org/repo.git", + "my-plugin", + None, + ), + ( + "https://host.com/org/repo.git/packages/my-plugin", + "https://host.com/org/repo.git", + "packages/my-plugin", + None, + ), + # .git URLs — trailing slash (no subpath) + ("https://host.com/org/repo.git/", "https://host.com/org/repo.git", None, None), + # SSH URLs + ("git@github.com:org/repo.git", "git@github.com:org/repo.git", None, None), + ( + "git@github.com:org/repo.git/my-plugin", + "git@github.com:org/repo.git", + "my-plugin", + None, + ), + # .github in hostname should not false-match + ( + "https://github.com/my.github.io/tools.git/plugin", + "https://github.com/my.github.io/tools.git", + "plugin", + None, + ), + # GitHub short URLs — no subpath + ("https://github.com/org/repo", "https://github.com/org/repo", None, None), + # GitHub short URLs — with subpath + ( + "https://github.com/org/repo/my-plugin", + "https://github.com/org/repo", + "my-plugin", + None, + ), + ( + "https://github.com/org/repo/packages/my-plugin", + "https://github.com/org/repo", + "packages/my-plugin", + None, + ), + # GitHub short URLs — trailing slash + ("https://github.com/org/repo/", "https://github.com/org/repo", None, None), + # GitHub browser URL with tree/branch — extracts branch + ( + "https://github.com/org/repo/tree/main/my-plugin", + "https://github.com/org/repo", + "my-plugin", + "main", + ), + ( + "https://github.com/org/repo/tree/develop/packages/my-plugin", + "https://github.com/org/repo", + "packages/my-plugin", + "develop", + ), + # GitLab short URLs + ( + "https://gitlab.com/org/repo/my-plugin", + "https://gitlab.com/org/repo", + "my-plugin", + None, + ), + ( + "https://gitlab.com/org/repo/tree/main/my-plugin", + "https://gitlab.com/org/repo", + "my-plugin", + "main", + ), + # GitLab /-/tree/ format + ( + "https://gitlab.com/org/repo/-/tree/main/my-plugin", + "https://gitlab.com/org/repo", + "my-plugin", + "main", + ), + # Edge case: fewer than 2 path segments — returned as-is + ("https://github.com/org", "https://github.com/org", None, None), + ], +) +def test_parse_git_url( + url: str, + expected_clone: str, + expected_subpath: str | None, + expected_branch: str | None, +): + clone_url, subpath, branch = _parse_git_url(url) + assert clone_url == expected_clone + assert subpath == expected_subpath + assert branch == expected_branch + + +# --- _resolve_source git subpath tests --- + + +def _mock_git_clone(plugins: list[str] | None = None, root_plugin: bool = False): + """Create a mock for subprocess.run that simulates git clone.""" + + def side_effect(cmd, **kwargs): + dest = Path(cmd[-1]) + dest.mkdir(parents=True) + if root_plugin: + (dest / "plugin.json").write_text( + json.dumps({"name": "root-plugin", "version": "1.0.0"}), + encoding="utf-8", + ) + for name in plugins or []: + sub = dest / name + sub.mkdir(parents=True, exist_ok=True) + (sub / "plugin.json").write_text( + json.dumps({"name": name, "version": "1.0.0"}), + encoding="utf-8", + ) + result = MagicMock() + result.returncode = 0 + result.stderr = "" + return result + + return side_effect + + +def test_resolve_source_git_with_subpath(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Git URL with subpath returns the sub-directory.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with patch( + "subprocess.run", + side_effect=_mock_git_clone(plugins=["my-plugin"]), + ): + source, tmp_dir = _resolve_source("https://github.com/org/repo.git/my-plugin") + assert source.name == "my-plugin" + assert (source / "plugin.json").exists() + assert tmp_dir is not None + + +def test_resolve_source_git_subpath_not_found(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Git URL with non-existent subpath raises Exit.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with ( + patch("subprocess.run", side_effect=_mock_git_clone(plugins=[])), + pytest.raises(typer.Exit), + ): + _resolve_source("https://github.com/org/repo.git/no-such-plugin") + + +def test_resolve_source_git_no_subpath_suggests_plugins( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +): + """No subpath + no root plugin.json -> list available plugins.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with ( + patch( + "subprocess.run", + side_effect=_mock_git_clone(plugins=["alpha", "beta"]), + ), + pytest.raises(typer.Exit), + ): + _resolve_source("https://github.com/org/repo.git") + captured = capsys.readouterr() + assert "alpha" in captured.err + assert "beta" in captured.err + + +def test_resolve_source_git_no_subpath_root_plugin(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """No subpath + root plugin.json -> returns root (existing behavior).""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with patch("subprocess.run", side_effect=_mock_git_clone(root_plugin=True)): + source, _ = _resolve_source("https://github.com/org/repo.git") + assert (source / "plugin.json").exists() + + +def test_resolve_source_git_subpath_traversal(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Subpath with '..' should be rejected.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with ( + patch("subprocess.run", side_effect=_mock_git_clone(plugins=[])), + pytest.raises(typer.Exit), + ): + _resolve_source("https://github.com/org/repo.git/../../etc") + + +def test_resolve_source_git_no_subpath_no_plugins( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +): + """No subpath + no root plugin.json + no sub-plugins -> plain error.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with ( + patch("subprocess.run", side_effect=_mock_git_clone(plugins=[])), + pytest.raises(typer.Exit), + ): + _resolve_source("https://github.com/org/repo.git") + captured = capsys.readouterr() + assert "No plugin.json found" in captured.err + + +def test_resolve_source_git_short_url_with_subpath(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """GitHub short URL with subpath (no .git) returns the sub-directory.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with patch( + "subprocess.run", + side_effect=_mock_git_clone(plugins=["my-plugin"]), + ): + source, _ = _resolve_source("https://github.com/org/repo/my-plugin") + assert source.name == "my-plugin" + assert (source / "plugin.json").exists() + + +def test_resolve_source_git_tree_url_passes_branch(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """tree/{branch}/ URL should pass --branch to git clone.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with patch( + "subprocess.run", + side_effect=_mock_git_clone(plugins=["my-plugin"]), + ) as mock_run: + source, _ = _resolve_source("https://github.com/org/repo/tree/develop/my-plugin") + # Verify --branch develop was passed to git clone + cmd = mock_run.call_args[0][0] + assert "--branch" in cmd + assert "develop" in cmd + assert source.name == "my-plugin" + + +def test_resolve_source_git_no_branch_omits_flag(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Non-tree URL should not pass --branch to git clone.""" + monkeypatch.setattr("tempfile.mkdtemp", lambda **kw: str(tmp_path / "tmp")) + (tmp_path / "tmp").mkdir() + + with patch( + "subprocess.run", + side_effect=_mock_git_clone(plugins=["my-plugin"]), + ) as mock_run: + _resolve_source("https://github.com/org/repo.git/my-plugin") + cmd = mock_run.call_args[0][0] + assert "--branch" not in cmd