Skip to content
Draft
2 changes: 2 additions & 0 deletions bases/polylith/cli/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def sync_command(
def deps_command(
directory: Annotated[str, options.directory] = "",
brick: Annotated[str, options.brick] = "",
interface: Annotated[bool, options.interface] = False,
save: Annotated[bool, options.save] = False,
):
"""Visualize the dependencies between bricks."""
Expand All @@ -205,6 +206,7 @@ def deps_command(
"brick": brick or None,
"save": save,
"output": output,
"show_interface": interface,
}

commands.deps.run(root, ns, cli_options)
Expand Down
1 change: 1 addition & 0 deletions bases/polylith/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@

brick = Option(help="Shows dependencies for selected brick.")
save = Option(help="Store the contents of this command to file.")
interface = Option(help="Show the brick interface.")
4 changes: 2 additions & 2 deletions components/polylith/check/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from polylith.check import collect, grouping, report
from polylith.check import collect, report

__all__ = ["collect", "grouping", "report"]
__all__ = ["collect", "report"]
4 changes: 2 additions & 2 deletions components/polylith/check/collect.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from pathlib import Path
from typing import Set

from polylith import check, imports, workspace
from polylith import imports, workspace


def extract_bricks(paths: Set[Path], ns: str) -> dict:
all_imports = imports.fetch_all_imports(paths)

return check.grouping.extract_brick_imports(all_imports, ns)
return imports.extract_brick_imports(all_imports, ns)


def with_unknown_components(root: Path, ns: str, brick_imports: dict) -> dict:
Expand Down
6 changes: 3 additions & 3 deletions components/polylith/check/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Set

from polylith import imports, libs, workspace
from polylith.check import collect, grouping
from polylith.check import collect
from polylith.reporting import theme
from rich.console import Console

Expand Down Expand Up @@ -78,8 +78,8 @@ def extract_collected_imports(
ns: str, imports_in_bases: dict, imports_in_components: dict
) -> dict:
brick_imports = {
"bases": grouping.extract_brick_imports(imports_in_bases, ns),
"components": grouping.extract_brick_imports(imports_in_components, ns),
"bases": imports.grouping.extract_brick_imports(imports_in_bases, ns),
"components": imports.grouping.extract_brick_imports(imports_in_components, ns),
}

third_party_imports = {
Expand Down
22 changes: 21 additions & 1 deletion components/polylith/commands/deps.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from typing import List, Set

from polylith import bricks, deps, info
from polylith import bricks, deps, info, interface


def get_imports(root: Path, ns: str, bricks: dict) -> dict:
Expand Down Expand Up @@ -30,9 +30,21 @@ def get_components(root: Path, ns: str, project_data: dict) -> Set[str]:
return pick_name(bricks.get_components_data(root, ns))


def used_by_as_bricks(bricks: dict, brick_deps: dict) -> dict:
bases = bricks["bases"]
components = bricks["components"]

used_by = brick_deps["used_by"]
return {
"bases": {b for b in used_by if b in bases},
"components": {b for b in used_by if b in components},
}


def run(root: Path, ns: str, options: dict):
directory = options.get("directory")
brick = options.get("brick")
show_interface = options.get("show_interface")

projects_data = info.get_projects_data(root, ns) if directory else []
project = next((p for p in projects_data if directory in p["path"].as_posix()), {})
Expand All @@ -53,13 +65,21 @@ def run(root: Path, ns: str, options: dict):

if brick and imports.get(brick):
brick_deps = bricks_deps[brick]
used_bricks = used_by_as_bricks(bricks, brick_deps)

circular_deps = circular_bricks.get(brick)

deps.print_brick_deps(brick, bricks, brick_deps, options)

if circular_deps:
deps.print_brick_with_circular_deps(brick, circular_deps, bricks)

if show_interface:
interface.report.print_brick_interface_invalid_usage(
root, ns, brick, used_bricks
)
interface.report.print_brick_interface(root, ns, brick, used_bricks)

return

deps.print_deps(bricks, imports, options)
Expand Down
12 changes: 12 additions & 0 deletions components/polylith/imports/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
from polylith.imports.grouping import (
extract_brick_imports,
extract_brick_imports_with_namespaces,
)
from polylith.imports.parser import (
extract_top_ns,
fetch_all_imports,
fetch_excluded_imports,
list_imports,
parse_module,
)
from polylith.imports.usages import SYMBOLS, extract_api, fetch_brick_import_usages

__all__ = [
"extract_brick_imports",
"extract_brick_imports_with_namespaces",
"extract_api",
"extract_top_ns",
"fetch_all_imports",
"fetch_brick_import_usages",
"fetch_excluded_imports",
"list_imports",
"parse_module",
"SYMBOLS",
]
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ def extract_brick_imports(all_imports: dict, top_ns) -> dict:
with_only_brick_names = only_brick_names(with_only_bricks)

return exclude_empty(with_only_brick_names)


def extract_brick_imports_with_namespaces(all_imports: dict, top_ns) -> dict:
with_only_bricks = only_bricks(all_imports, top_ns)

return exclude_empty(with_only_bricks)
8 changes: 4 additions & 4 deletions components/polylith/imports/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def parse_node(node: ast.AST) -> Union[dict, None]:
return None


def parse_module(path: Path) -> ast.AST:
def parse_module(path: Path) -> ast.Module:
with open(path.as_posix(), "r", encoding="utf-8", errors="ignore") as f:
tree = ast.parse(f.read(), path.name)

Expand All @@ -88,7 +88,7 @@ def extract_imports(path: Path) -> List[str]:
return [i for i in includes if i not in excludes]


def extract_and_flatten(py_modules: Iterable) -> Set[str]:
def extract_imports_and_flatten(py_modules: Iterable) -> Set[str]:
return {i for m in py_modules for i in extract_imports(m)}


Expand All @@ -104,7 +104,7 @@ def find_files(path: Path) -> Iterable:
def list_imports(path: Path) -> Set[str]:
py_modules = find_files(path)

return extract_and_flatten(py_modules)
return extract_imports_and_flatten(py_modules)


def fetch_all_imports(paths: Set[Path]) -> dict:
Expand All @@ -122,7 +122,7 @@ def list_excluded_imports(path: Path, excludes: Set[str]) -> Set[str]:

filtered = [p for p in py_modules if should_exclude(p, excludes)]

return extract_and_flatten(filtered)
return extract_imports_and_flatten(filtered)


def fetch_excluded_imports(paths: Set[Path], excludes: Set[str]) -> dict:
Expand Down
144 changes: 144 additions & 0 deletions components/polylith/imports/usages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import ast
from functools import lru_cache
from pathlib import Path
from typing import FrozenSet, Optional, Set, Tuple, Union

from polylith.imports.parser import extract_imports, find_files, parse_module

WRAPPER_NODES = (ast.Await, ast.Expr, ast.NamedExpr, ast.Starred, ast.Subscript)
FN_NODES = (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)
SYMBOLS = (*FN_NODES, ast.ClassDef)


def extract_api_part(path: str) -> str:
return path.rsplit(".", 1)[-1]


def extract_api(paths: Set[str]) -> Set[str]:
return {extract_api_part(p) for p in paths}


def find_import_root_and_path(
expr: ast.expr, parts: Tuple[str, ...] = ()
) -> Tuple[ast.expr, str]:
"""Builds a namespace when the expression is an Attribute or Name, otherwise empty."""
if isinstance(expr, ast.Attribute):
return find_import_root_and_path(expr.value, (*parts, expr.attr))

namespace_parts = (*parts, expr.id) if isinstance(expr, ast.Name) else parts

namespace = str.join(".", reversed(namespace_parts))

return expr, namespace


def with_ns(usage: str, ns: str) -> str:
return usage if str.startswith(usage, ns + ".") else f"{ns}.{usage}"


def find_matching_usage(expr: ast.expr, options: dict) -> Optional[str]:
ns = options["ns"]
api_map = options["api_map"]
allowed_prefixes = options["allowed_prefixes"]
shadowed = options["shadowed"]

root, usage = find_import_root_and_path(expr)

if not isinstance(root, ast.Name):
return None

if root.id in shadowed:
return None

if root.id in api_map:
found = api_map[root.id] if usage == root.id else usage

return with_ns(found, ns)

if any(usage.startswith(p + ".") for p in allowed_prefixes):
return with_ns(usage, ns)

return None


def parse_import_usage(node: ast.AST, options: dict) -> Union[str, None]:
usage = None
child = None

if isinstance(node, ast.Attribute):
usage = find_matching_usage(node, options)
child = node.value
elif isinstance(node, WRAPPER_NODES):
child = node.value
elif isinstance(node, ast.Call):
usage = find_matching_usage(node.func, options)
child = node.func
elif isinstance(node, ast.UnaryOp):
child = node.operand

if usage:
return usage

return parse_import_usage(child, options) if child is not None else None


def collect_arg_names(
fn: Union[ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda],
) -> Set[str]:
args = fn.args

names = {a.arg for a in args.posonlyargs + args.args + args.kwonlyargs}

if args.vararg:
names.add(args.vararg.arg)

if args.kwarg:
names.add(args.kwarg.arg)

return names


def walk_usages(node: ast.AST, options: dict) -> Set[str]:
if isinstance(node, FN_NODES):
options = {
**options,
"shadowed": options["shadowed"] | frozenset(collect_arg_names(node)),
}

out = set()
hit = parse_import_usage(node, options)

if hit:
out.add(hit)

for child in ast.iter_child_nodes(node):
out |= walk_usages(child, options)

return out


def fetch_import_usages_in_module(path: Path, ns: str, imported: Set[str]) -> Set[str]:
tree = parse_module(path)
api_map = {extract_api_part(p): p for p in imported}

options = {
"ns": ns,
"api_map": api_map,
"allowed_prefixes": frozenset(api_map.values()),
"shadowed": frozenset(),
}
return walk_usages(tree, options)


@lru_cache(maxsize=None)
def fetch_brick_import_usages(
path: Path, ns: str, imported: FrozenSet[str]
) -> Set[str]:
py_modules = find_files(path)

found = {m: set(extract_imports(m)).intersection(imported) for m in py_modules}
filtered = {k: v for k, v in found.items() if v}

fetched = (fetch_import_usages_in_module(k, ns, v) for k, v in filtered.items())

return {i for f in fetched if f for i in f}
3 changes: 2 additions & 1 deletion components/polylith/interface/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from polylith.interface import report
from polylith.interface.interfaces import create_interface

__all__ = ["create_interface"]
__all__ = ["create_interface", "report"]
Loading