diff --git a/conftest.py b/conftest.py index 36e8cd3e..bed66bc7 100644 --- a/conftest.py +++ b/conftest.py @@ -22,7 +22,7 @@ @pytest.fixture(autouse=True) def add_doctest_fixtures( request: pytest.FixtureRequest, - doctest_namespace: dict[str, t.Any], + doctest_namespace: dict[str, object], ) -> None: """Harness pytest fixtures to doctests namespace.""" from _pytest.doctest import DoctestItem diff --git a/scripts/generate_gitlab.py b/scripts/generate_gitlab.py index 0274bd26..ab8e7523 100755 --- a/scripts/generate_gitlab.py +++ b/scripts/generate_gitlab.py @@ -12,7 +12,6 @@ import requests import yaml -from libvcs.sync.git import GitRemote from vcspull.cli.sync import CouldNotGuessVCSFromURL, guess_vcs @@ -108,11 +107,10 @@ "path": path / reponame, "url": f"git+ssh://{url_to_repo}", "remotes": { - "origin": GitRemote( - name="origin", - fetch_url=f"ssh://{url_to_repo}", - push_url=f"ssh://{url_to_repo}", - ), + "origin": { + "fetch_url": f"ssh://{url_to_repo}", + "push_url": f"ssh://{url_to_repo}", + }, }, "vcs": vcs, } diff --git a/src/vcspull/_internal/config_reader.py b/src/vcspull/_internal/config_reader.py index 4313d793..10b67b85 100644 --- a/src/vcspull/_internal/config_reader.py +++ b/src/vcspull/_internal/config_reader.py @@ -8,7 +8,7 @@ import yaml FormatLiteral = t.Literal["json", "yaml"] -RawConfigData: t.TypeAlias = dict[t.Any, t.Any] +RawConfigData: t.TypeAlias = dict[str, object] class ConfigReader: @@ -25,7 +25,7 @@ def __init__(self, content: RawConfigData) -> None: self.content = content @staticmethod - def _load(fmt: FormatLiteral, content: str) -> dict[str, t.Any]: + def _load(fmt: FormatLiteral, content: str) -> dict[str, object]: """Load raw config data and directly return it. >>> ConfigReader._load("json", '{ "session_name": "my session" }') @@ -36,14 +36,14 @@ def _load(fmt: FormatLiteral, content: str) -> dict[str, t.Any]: """ if fmt == "yaml": return t.cast( - "dict[str, t.Any]", + "dict[str, object]", yaml.load( content, Loader=yaml.SafeLoader, ), ) if fmt == "json": - return t.cast("dict[str, t.Any]", json.loads(content)) + return t.cast("dict[str, object]", json.loads(content)) msg = f"{fmt} not supported in configuration" raise NotImplementedError(msg) @@ -71,7 +71,7 @@ def load(cls, fmt: FormatLiteral, content: str) -> ConfigReader: ) @classmethod - def _from_file(cls, path: pathlib.Path) -> dict[str, t.Any]: + def _from_file(cls, path: pathlib.Path) -> dict[str, object]: r"""Load data from file path directly to dictionary. **YAML file** @@ -175,7 +175,7 @@ def _dump( fmt: FormatLiteral, content: RawConfigData, indent: int = 2, - **kwargs: t.Any, + **kwargs: object, ) -> str: r"""Dump directly. @@ -200,7 +200,7 @@ def _dump( msg = f"{fmt} not supported in config" raise NotImplementedError(msg) - def dump(self, fmt: FormatLiteral, indent: int = 2, **kwargs: t.Any) -> str: + def dump(self, fmt: FormatLiteral, indent: int = 2, **kwargs: object) -> str: r"""Dump via ConfigReader instance. >>> cfg = ConfigReader({ "session_name": "my session" }) @@ -222,23 +222,23 @@ class _DuplicateTrackingSafeLoader(yaml.SafeLoader): def __init__(self, stream: str) -> None: super().__init__(stream) - self.top_level_key_values: dict[t.Any, list[t.Any]] = {} + self.top_level_key_values: dict[object, list[object]] = {} self._mapping_depth = 0 - self.top_level_items: list[tuple[t.Any, t.Any]] = [] + self.top_level_items: list[tuple[object, object]] = [] def _duplicate_tracking_construct_mapping( loader: _DuplicateTrackingSafeLoader, node: yaml.nodes.MappingNode, deep: bool = False, -) -> dict[t.Any, t.Any]: +) -> dict[object, object]: loader._mapping_depth += 1 loader.flatten_mapping(node) - mapping: dict[t.Any, t.Any] = {} + mapping: dict[object, object] = {} for key_node, value_node in node.value: construct = t.cast( - "t.Callable[[yaml.nodes.Node], t.Any]", + "t.Callable[[yaml.nodes.Node], object]", loader.construct_object, ) key = construct(key_node) @@ -268,20 +268,20 @@ def __init__( self, content: RawConfigData, *, - duplicate_sections: dict[str, list[t.Any]] | None = None, - top_level_items: list[tuple[str, t.Any]] | None = None, + duplicate_sections: dict[str, list[object]] | None = None, + top_level_items: list[tuple[str, object]] | None = None, ) -> None: super().__init__(content) self._duplicate_sections = duplicate_sections or {} self._top_level_items = top_level_items or [] @property - def duplicate_sections(self) -> dict[str, list[t.Any]]: + def duplicate_sections(self) -> dict[str, list[object]]: """Mapping of top-level keys to the list of duplicated values.""" return self._duplicate_sections @property - def top_level_items(self) -> list[tuple[str, t.Any]]: + def top_level_items(self) -> list[tuple[str, object]]: """Ordered list of top-level items, including duplicates.""" return copy.deepcopy(self._top_level_items) @@ -289,7 +289,7 @@ def top_level_items(self) -> list[tuple[str, t.Any]]: def _load_yaml_with_duplicates( cls, content: str, - ) -> tuple[dict[str, t.Any], dict[str, list[t.Any]], list[tuple[str, t.Any]]]: + ) -> tuple[dict[str, object], dict[str, list[object]], list[tuple[str, object]]]: loader = _DuplicateTrackingSafeLoader(content) try: @@ -299,12 +299,12 @@ def _load_yaml_with_duplicates( dispose() if data is None: - loaded: dict[str, t.Any] = {} + loaded: dict[str, object] = {} else: if not isinstance(data, dict): msg = "Loaded configuration is not a mapping" raise TypeError(msg) - loaded = t.cast("dict[str, t.Any]", data) + loaded = t.cast("dict[str, object]", data) duplicate_sections = { t.cast("str", key): values @@ -323,7 +323,7 @@ def _load_yaml_with_duplicates( def _load_from_path( cls, path: pathlib.Path, - ) -> tuple[dict[str, t.Any], dict[str, list[t.Any]], list[tuple[str, t.Any]]]: + ) -> tuple[dict[str, object], dict[str, list[object]], list[tuple[str, object]]]: if path.suffix.lower() in {".yaml", ".yml"}: content = path.read_text(encoding="utf-8") return cls._load_yaml_with_duplicates(content) @@ -340,7 +340,7 @@ def from_file(cls, path: pathlib.Path) -> DuplicateAwareConfigReader: ) @classmethod - def _from_file(cls, path: pathlib.Path) -> dict[str, t.Any]: + def _from_file(cls, path: pathlib.Path) -> dict[str, object]: content, _, _ = cls._load_from_path(path) return content @@ -348,6 +348,6 @@ def _from_file(cls, path: pathlib.Path) -> dict[str, t.Any]: def load_with_duplicates( cls, path: pathlib.Path, - ) -> tuple[dict[str, t.Any], dict[str, list[t.Any]], list[tuple[str, t.Any]]]: + ) -> tuple[dict[str, object], dict[str, list[object]], list[tuple[str, object]]]: reader = cls.from_file(path) return reader.content, reader.duplicate_sections, reader.top_level_items diff --git a/src/vcspull/_internal/private_path.py b/src/vcspull/_internal/private_path.py index 367d3dfd..4d501393 100644 --- a/src/vcspull/_internal/private_path.py +++ b/src/vcspull/_internal/private_path.py @@ -34,7 +34,11 @@ class PrivatePath(PrivatePathBase): '~/notes.txt' """ - def __new__(cls, *args: t.Any, **kwargs: t.Any) -> PrivatePath: + def __new__( + cls, + *args: str | os.PathLike[str], + **kwargs: object, + ) -> PrivatePath: return super().__new__(cls, *args, **kwargs) @classmethod diff --git a/src/vcspull/cli/__init__.py b/src/vcspull/cli/__init__.py index 5a181c20..f6eb7a3c 100644 --- a/src/vcspull/cli/__init__.py +++ b/src/vcspull/cli/__init__.py @@ -25,6 +25,16 @@ log = logging.getLogger(__name__) +SubparserTuple: t.TypeAlias = tuple[ + argparse.ArgumentParser, + argparse.ArgumentParser, + argparse.ArgumentParser, + argparse.ArgumentParser, + argparse.ArgumentParser, + argparse.ArgumentParser, +] + + def build_description( intro: str, example_blocks: t.Sequence[tuple[str | None, t.Sequence[str]]], @@ -208,7 +218,7 @@ def build_description( @overload def create_parser( return_subparsers: t.Literal[True], -) -> tuple[argparse.ArgumentParser, t.Any]: ... +) -> tuple[argparse.ArgumentParser, SubparserTuple]: ... @overload @@ -217,7 +227,7 @@ def create_parser(return_subparsers: t.Literal[False]) -> argparse.ArgumentParse def create_parser( return_subparsers: bool = False, -) -> argparse.ArgumentParser | tuple[argparse.ArgumentParser, t.Any]: +) -> argparse.ArgumentParser | tuple[argparse.ArgumentParser, SubparserTuple]: """Create CLI argument parser for vcspull.""" parser = argparse.ArgumentParser( prog="vcspull", diff --git a/src/vcspull/cli/_formatter.py b/src/vcspull/cli/_formatter.py index da7ffcc1..04661b8e 100644 --- a/src/vcspull/cli/_formatter.py +++ b/src/vcspull/cli/_formatter.py @@ -35,11 +35,21 @@ } +class _HelpTheme(t.Protocol): + heading: str + reset: str + label: str + long_option: str + short_option: str + prog: str + action: str + + class VcspullHelpFormatter(argparse.RawDescriptionHelpFormatter): """Render description blocks while colorizing example sections when possible.""" def _fill_text(self, text: str, width: int, indent: str) -> str: - theme = getattr(self, "_theme", None) + theme = t.cast("_HelpTheme | None", getattr(self, "_theme", None)) if not text or theme is None: return super()._fill_text(text, width, indent) @@ -93,7 +103,7 @@ def _colorize_example_line( self, content: str, *, - theme: t.Any, + theme: _HelpTheme, expect_value: bool, ) -> _ColorizedLine: parts: list[str] = [] diff --git a/src/vcspull/cli/_output.py b/src/vcspull/cli/_output.py index 8c8a9bc6..ad6f375f 100644 --- a/src/vcspull/cli/_output.py +++ b/src/vcspull/cli/_output.py @@ -8,6 +8,64 @@ from dataclasses import dataclass, field from enum import Enum +import typing_extensions + +JsonPrimitive: t.TypeAlias = str | int | float | bool | None +JsonValue: t.TypeAlias = JsonPrimitive | dict[str, "JsonValue"] | list["JsonValue"] +JsonObject: t.TypeAlias = t.Mapping[str, JsonValue] +OutputPayload: t.TypeAlias = JsonObject | "PlanEntryPayload" | "PlanSummaryPayload" + + +class PlanEntryPayload(t.TypedDict): + """Typed JSON payload for a plan entry.""" + + format_version: str + type: str + name: str + path: str + workspace_root: str + action: str + detail: typing_extensions.NotRequired[str] + url: typing_extensions.NotRequired[str] + branch: typing_extensions.NotRequired[str] + remote_branch: typing_extensions.NotRequired[str] + current_rev: typing_extensions.NotRequired[str] + target_rev: typing_extensions.NotRequired[str] + ahead: typing_extensions.NotRequired[int] + behind: typing_extensions.NotRequired[int] + dirty: typing_extensions.NotRequired[bool] + error: typing_extensions.NotRequired[str] + diagnostics: typing_extensions.NotRequired[list[str]] + + +class PlanSummaryPayload(t.TypedDict): + """Typed JSON payload for a plan summary.""" + + format_version: str + type: str + clone: int + update: int + unchanged: int + blocked: int + errors: int + total: int + duration_ms: typing_extensions.NotRequired[int] + + +class PlanWorkspacePayload(t.TypedDict): + """Typed JSON payload for a workspace grouping.""" + + path: str + operations: list[PlanEntryPayload] + + +class PlanResultPayload(t.TypedDict): + """Typed JSON payload for a plan result.""" + + format_version: str + workspaces: list[PlanWorkspacePayload] + summary: PlanSummaryPayload + class OutputMode(Enum): """Output format modes.""" @@ -47,9 +105,9 @@ class PlanEntry: error: str | None = None diagnostics: list[str] = field(default_factory=list) - def to_payload(self) -> dict[str, t.Any]: + def to_payload(self) -> PlanEntryPayload: """Convert the plan entry into a serialisable payload.""" - payload: dict[str, t.Any] = { + payload: PlanEntryPayload = { "format_version": "1", "type": "operation", "name": self.name, @@ -97,9 +155,9 @@ def total(self) -> int: """Return the total number of repositories accounted for.""" return self.clone + self.update + self.unchanged + self.blocked + self.errors - def to_payload(self) -> dict[str, t.Any]: + def to_payload(self) -> PlanSummaryPayload: """Convert the summary to a serialisable payload.""" - payload: dict[str, t.Any] = { + payload: PlanSummaryPayload = { "format_version": "1", "type": "summary", "clone": self.clone, @@ -139,9 +197,9 @@ def to_workspace_mapping(self) -> dict[str, list[PlanEntry]]: grouped.setdefault(entry.workspace_root, []).append(entry) return grouped - def to_json_object(self) -> dict[str, t.Any]: + def to_json_object(self) -> PlanResultPayload: """Return the JSON structure for ``--json`` output.""" - workspaces: list[dict[str, t.Any]] = [] + workspaces: list[PlanWorkspacePayload] = [] for workspace_root, entries in self.to_workspace_mapping().items(): workspaces.append( { @@ -168,17 +226,18 @@ def __init__(self, mode: OutputMode = OutputMode.HUMAN) -> None: The output mode to use (human, json, ndjson) """ self.mode = mode - self._json_buffer: list[dict[str, t.Any]] = [] + self._json_buffer: list[OutputPayload] = [] - def emit(self, data: dict[str, t.Any] | PlanEntry | PlanSummary) -> None: + def emit(self, data: OutputPayload | PlanEntry | PlanSummary) -> None: """Emit a data event. Parameters ---------- - data : dict | PlanEntry | PlanSummary + data : OutputPayload | PlanEntry | PlanSummary Event data to emit. PlanEntry and PlanSummary instances are serialised automatically. """ + payload: OutputPayload if isinstance(data, (PlanEntry, PlanSummary)): payload = data.to_payload() else: diff --git a/src/vcspull/cli/add.py b/src/vcspull/cli/add.py index d979bcfc..cfc9aa11 100644 --- a/src/vcspull/cli/add.py +++ b/src/vcspull/cli/add.py @@ -27,6 +27,13 @@ log = logging.getLogger(__name__) +class _OrderedItem(t.TypedDict): + """Ordered config entry preserving label/section pairs.""" + + label: str + section: object + + def create_add_subparser(parser: argparse.ArgumentParser) -> None: """Create ``vcspull add`` argument subparser. @@ -164,40 +171,43 @@ def _normalize_detected_url(remote: str | None) -> tuple[str, str]: def _build_ordered_items( - top_level_items: list[tuple[str, t.Any]] | None, - raw_config: dict[str, t.Any], -) -> list[dict[str, t.Any]]: + top_level_items: list[tuple[str, object]] | None, + raw_config: dict[str, object], +) -> list[_OrderedItem]: """Return deep-copied top-level items preserving original ordering.""" - source: list[tuple[str, t.Any]] = top_level_items or list(raw_config.items()) + source: list[tuple[str, object]] = top_level_items or list(raw_config.items()) - ordered: list[dict[str, t.Any]] = [] + ordered: list[_OrderedItem] = [] for label, section in source: ordered.append({"label": label, "section": copy.deepcopy(section)}) return ordered def _aggregate_from_ordered_items( - items: list[dict[str, t.Any]], -) -> dict[str, t.Any]: + items: list[_OrderedItem], +) -> dict[str, object]: """Collapse ordered top-level items into a mapping grouped by label.""" - aggregated: dict[str, t.Any] = {} + aggregated: dict[str, object] = {} for entry in items: label = entry["label"] section = entry["section"] if isinstance(section, dict): - workspace_section = aggregated.setdefault(label, {}) + workspace_section = t.cast( + "dict[str, object]", + aggregated.setdefault(label, {}), + ) for repo_name, repo_config in section.items(): - workspace_section[repo_name] = copy.deepcopy(repo_config) + workspace_section[str(repo_name)] = copy.deepcopy(repo_config) else: aggregated[label] = copy.deepcopy(section) return aggregated def _collect_duplicate_sections( - items: list[dict[str, t.Any]], -) -> dict[str, list[t.Any]]: + items: list[_OrderedItem], +) -> dict[str, list[object]]: """Return mapping of labels to their repeated sections (>= 2 occurrences).""" - occurrences: dict[str, list[t.Any]] = {} + occurrences: dict[str, list[object]] = {} for entry in items: label = entry["label"] occurrences.setdefault(label, []).append(copy.deepcopy(entry["section"])) @@ -383,9 +393,9 @@ def add_repo( config_file_path = home_configs[0] # Load existing config - raw_config: dict[str, t.Any] - duplicate_root_occurrences: dict[str, list[t.Any]] - top_level_items: list[tuple[str, t.Any]] + raw_config: dict[str, object] + duplicate_root_occurrences: dict[str, list[object]] + top_level_items: list[tuple[str, object]] display_config_path = str(PrivatePath(config_file_path)) if config_file_path.exists() and config_file_path.is_file(): @@ -439,7 +449,7 @@ def add_repo( new_repo_entry = {"repo": url} def _ensure_workspace_label_for_merge( - config_data: dict[str, t.Any], + config_data: dict[str, object], ) -> tuple[str, bool]: workspace_map: dict[pathlib.Path, str] = {} for label, section in config_data.items(): @@ -473,7 +483,7 @@ def _ensure_workspace_label_for_merge( return workspace_label, relabelled def _prepare_no_merge_items( - items: list[dict[str, t.Any]], + items: list[_OrderedItem], ) -> tuple[str, int, bool]: matching_indexes: list[int] = [] for idx, entry in enumerate(items): diff --git a/src/vcspull/cli/discover.py b/src/vcspull/cli/discover.py index 39f8053f..7e83d2ca 100644 --- a/src/vcspull/cli/discover.py +++ b/src/vcspull/cli/discover.py @@ -261,8 +261,8 @@ def discover_repos( config_scope = _classify_config_scope(config_file_path, cwd=cwd, home=home) allow_relative_workspace = config_scope == "project" - raw_config: dict[str, t.Any] - duplicate_root_occurrences: dict[str, list[t.Any]] + raw_config: dict[str, object] + duplicate_root_occurrences: dict[str, list[object]] if config_file_path.exists() and config_file_path.is_file(): try: ( @@ -595,9 +595,11 @@ def discover_repos( ) workspace_map[workspace_path] = workspace_label - if workspace_label not in raw_config: - raw_config[workspace_label] = {} - elif not isinstance(raw_config[workspace_label], dict): + section = raw_config.get(workspace_label) + if section is None: + section = {} + raw_config[workspace_label] = section + elif not isinstance(section, dict): log.warning( "Workspace root '%s' in config is not a dictionary. Skipping repo %s.", workspace_label, @@ -605,8 +607,9 @@ def discover_repos( ) continue - if repo_name not in raw_config[workspace_label]: - raw_config[workspace_label][repo_name] = {"repo": repo_url} + workspace_section = t.cast("dict[str, object]", section) + if repo_name not in workspace_section: + workspace_section[repo_name] = {"repo": repo_url} log.info( "%s+%s Importing %s'%s'%s (%s%s%s) under '%s%s%s'.", Fore.GREEN, diff --git a/src/vcspull/cli/fmt.py b/src/vcspull/cli/fmt.py index 719f2bf4..a97ec2dd 100644 --- a/src/vcspull/cli/fmt.py +++ b/src/vcspull/cli/fmt.py @@ -23,6 +23,8 @@ log = logging.getLogger(__name__) +RepoConfigData: t.TypeAlias = str | pathlib.Path | t.Mapping[str, object] + def create_fmt_subparser(parser: argparse.ArgumentParser) -> None: """Create ``vcspull fmt`` argument subparser.""" @@ -53,12 +55,12 @@ def create_fmt_subparser(parser: argparse.ArgumentParser) -> None: parser.set_defaults(merge_roots=True) -def normalize_repo_config(repo_data: t.Any) -> dict[str, t.Any]: +def normalize_repo_config(repo_data: RepoConfigData) -> dict[str, object]: """Normalize repository configuration to verbose format. Parameters ---------- - repo_data : Any + repo_data : str | pathlib.Path | Mapping[str, object] Repository configuration (string URL or dict) Returns @@ -69,19 +71,21 @@ def normalize_repo_config(repo_data: t.Any) -> dict[str, t.Any]: if isinstance(repo_data, str): # Convert compact format to verbose format return {"repo": repo_data} - if isinstance(repo_data, dict): - # If it has 'url' key but not 'repo', convert to use 'repo' - if "url" in repo_data and "repo" not in repo_data: - normalized = repo_data.copy() - normalized["repo"] = normalized.pop("url") - return normalized - # Already in correct format or has other fields - return repo_data - # Return as-is for other types - return t.cast("dict[str, t.Any]", repo_data) - - -def format_config(config_data: dict[str, t.Any]) -> tuple[dict[str, t.Any], int]: + if isinstance(repo_data, pathlib.Path): + return {"repo": str(repo_data)} + repo_map = dict(repo_data) + # If it has 'url' key but not 'repo', convert to use 'repo' + if "url" in repo_map and "repo" not in repo_map: + normalized = repo_map.copy() + normalized["repo"] = normalized.pop("url") + return normalized + # Already in correct format or has other fields + return repo_map + + +def format_config( + config_data: t.Mapping[str, object], +) -> tuple[dict[str, object], int]: """Format vcspull configuration for consistency. Parameters @@ -95,7 +99,7 @@ def format_config(config_data: dict[str, t.Any]) -> tuple[dict[str, t.Any], int] Formatted configuration and count of changes made """ changes = 0 - formatted: dict[str, t.Any] = {} + formatted: dict[str, object] = {} # Sort directories sorted_dirs = sorted(config_data.keys()) @@ -109,11 +113,12 @@ def format_config(config_data: dict[str, t.Any]) -> tuple[dict[str, t.Any], int] continue # Sort repositories within each directory - sorted_repos = sorted(repos.keys()) - formatted_dir: dict[str, t.Any] = {} + repos_map = t.cast("dict[str, object]", repos) + sorted_repos = sorted(repos_map.keys()) + formatted_dir: dict[str, object] = {} for repo_name in sorted_repos: - repo_data = repos[repo_name] + repo_data = t.cast("RepoConfigData", repos_map[repo_name]) normalized = normalize_repo_config(repo_data) # Check if normalization changed anything diff --git a/src/vcspull/cli/status.py b/src/vcspull/cli/status.py index 2d5eaeb8..5d654928 100644 --- a/src/vcspull/cli/status.py +++ b/src/vcspull/cli/status.py @@ -19,7 +19,7 @@ from vcspull.types import ConfigDict from ._colors import Colors, get_color_mode -from ._output import OutputFormatter, get_output_mode +from ._output import JsonObject, OutputFormatter, get_output_mode from ._workspaces import filter_by_workspace log = logging.getLogger(__name__) @@ -28,6 +28,20 @@ ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-9;]*m") +class StatusResult(t.TypedDict): + """Typed status payload for a single repository.""" + + name: str + path: str + workspace_root: str + exists: bool + is_git: bool + clean: bool | None + branch: str | None + ahead: int | None + behind: int | None + + @dataclass class StatusCheckConfig: """Configuration options for status checking.""" @@ -174,7 +188,7 @@ async def _check_repos_status_async( *, config: StatusCheckConfig, progress: StatusProgressPrinter | None, -) -> list[dict[str, t.Any]]: +) -> list[StatusResult]: """Check repository status concurrently using asyncio. Parameters @@ -188,18 +202,18 @@ async def _check_repos_status_async( Returns ------- - list[dict[str, t.Any]] + list[StatusResult] List of status dictionaries in completion order """ if not repos: return [] semaphore = asyncio.Semaphore(min(config.max_concurrent, len(repos))) - results: list[dict[str, t.Any]] = [] + results: list[StatusResult] = [] exists_count = 0 missing_count = 0 - async def check_with_limit(repo: ConfigDict) -> dict[str, t.Any]: + async def check_with_limit(repo: ConfigDict) -> StatusResult: async with semaphore: return await asyncio.to_thread( check_repo_status, @@ -214,7 +228,7 @@ async def check_with_limit(repo: ConfigDict) -> dict[str, t.Any]: results.append(status) # Update counts for progress - if status.get("exists"): + if status["exists"]: exists_count += 1 else: missing_count += 1 @@ -242,7 +256,7 @@ def _run_git_command( return None -def check_repo_status(repo: ConfigDict, detailed: bool = False) -> dict[str, t.Any]: +def check_repo_status(repo: ConfigDict, detailed: bool = False) -> StatusResult: """Check the status of a single repository. Parameters @@ -261,7 +275,7 @@ def check_repo_status(repo: ConfigDict, detailed: bool = False) -> dict[str, t.A repo_name = repo.get("name", "unknown") workspace_root = repo.get("workspace_root", "") - status: dict[str, t.Any] = { + status: StatusResult = { "name": repo_name, "path": str(PrivatePath(repo_path)), "workspace_root": workspace_root, @@ -451,25 +465,28 @@ def status_repos( summary["missing"] += 1 # Emit status - formatter.emit( + status_payload = t.cast( + "JsonObject", { "reason": "status", **status, }, ) + formatter.emit(status_payload) # Human output _format_status_line(status, formatter, colors, detailed) # Emit summary - summary_data: dict[str, t.Any] = { - "reason": "summary", - **summary, - } - if duration_ms is not None: - summary_data["duration_ms"] = duration_ms - - formatter.emit(summary_data) + summary_payload = t.cast( + "JsonObject", + { + "reason": "summary", + **summary, + **({"duration_ms": duration_ms} if duration_ms is not None else {}), + }, + ) + formatter.emit(summary_payload) # Human summary formatter.emit_text( @@ -482,7 +499,7 @@ def status_repos( def _format_status_line( - status: dict[str, t.Any], + status: StatusResult, formatter: OutputFormatter, colors: Colors, detailed: bool, diff --git a/src/vcspull/cli/sync.py b/src/vcspull/cli/sync.py index 7855929a..a2f8a070 100644 --- a/src/vcspull/cli/sync.py +++ b/src/vcspull/cli/sync.py @@ -13,16 +13,18 @@ import subprocess import sys import typing as t -from collections.abc import Callable from copy import deepcopy from dataclasses import dataclass from datetime import datetime from io import StringIO from time import perf_counter +from libvcs._internal.run import ProgressCallbackProtocol from libvcs._internal.shortcuts import create_project from libvcs._internal.types import VCSLiteral from libvcs.sync.git import GitSync +from libvcs.sync.hg import HgSync +from libvcs.sync.svn import SvnSync from libvcs.url import registry as url_tools from vcspull import exc @@ -32,6 +34,8 @@ from ._colors import Colors, get_color_mode from ._output import ( + JsonObject, + JsonValue, OutputFormatter, OutputMode, PlanAction, @@ -42,11 +46,46 @@ get_output_mode, ) from ._workspaces import filter_by_workspace -from .status import check_repo_status +from .status import StatusResult, check_repo_status log = logging.getLogger(__name__) -ProgressCallback = Callable[[str, datetime], None] +ProgressCallback: t.TypeAlias = ProgressCallbackProtocol + + +class RepoPayloadBase(t.TypedDict): + """Keyword arguments used to create a repo via libvcs.""" + + url: str + path: str | os.PathLike[str] + progress_callback: ProgressCallback | None + + +class GitRepoPayload(RepoPayloadBase): + """Keyword arguments for git repositories.""" + + vcs: t.Literal["git"] + + +class HgRepoPayload(RepoPayloadBase): + """Keyword arguments for Mercurial repositories.""" + + vcs: t.Literal["hg"] + + +class SvnRepoPayload(RepoPayloadBase): + """Keyword arguments for Subversion repositories.""" + + vcs: t.Literal["svn"] + + +class RepoPayload(t.TypedDict): + """Keyword arguments used to create a repo via libvcs.""" + + url: str + path: str | os.PathLike[str] + vcs: VCSLiteral | None + progress_callback: ProgressCallback | None PLAN_SYMBOLS: dict[PlanAction, str] = { @@ -187,7 +226,7 @@ def _maybe_fetch( def _determine_plan_action( - status: dict[str, t.Any], + status: StatusResult, *, config: SyncPlanConfig, ) -> tuple[PlanAction, str | None]: @@ -718,7 +757,7 @@ def silent_progress(output: str, timestamp: datetime) -> None: summary["total"] += 1 - event: dict[str, t.Any] = { + event: dict[str, JsonValue] = { "reason": "sync", "name": repo_name, "path": display_repo_path, @@ -763,12 +802,14 @@ def silent_progress(output: str, timestamp: datetime) -> None: f"{colors.error(str(e))}", ) if exit_on_error: - formatter.emit( + summary_payload = t.cast( + "JsonObject", { "reason": "summary", **summary, }, ) + formatter.emit(summary_payload) formatter.finalize() if parser is not None: parser.exit(status=1, message=EXIT_ON_ERROR_MSG) @@ -783,12 +824,14 @@ def silent_progress(output: str, timestamp: datetime) -> None: f"{colors.muted('→')} {display_repo_path}", ) - formatter.emit( + summary_payload = t.cast( + "JsonObject", { "reason": "summary", **summary, }, ) + formatter.emit(summary_payload) if formatter.mode == OutputMode.HUMAN: formatter.emit_text( @@ -825,33 +868,44 @@ def guess_vcs(url: str) -> VCSLiteral | None: class CouldNotGuessVCSFromURL(exc.VCSPullException): """Raised when no VCS could be guessed from a URL.""" - def __init__(self, repo_url: str, *args: object, **kwargs: object) -> None: + def __init__(self, repo_url: str) -> None: return super().__init__(f"Could not automatically determine VCS for {repo_url}") def update_repo( - repo_dict: t.Any, + repo_dict: ConfigDict, progress_callback: ProgressCallback | None = None, # repo_dict: Dict[str, Union[str, Dict[str, GitRemote], pathlib.Path]] -) -> GitSync: +) -> GitSync | HgSync | SvnSync: """Synchronize a single repository.""" - repo_dict = deepcopy(repo_dict) - if "pip_url" not in repo_dict: - repo_dict["pip_url"] = repo_dict.pop("url") - if "url" not in repo_dict: - repo_dict["url"] = repo_dict.pop("pip_url") + repo_payload = t.cast("dict[str, object]", deepcopy(repo_dict)) + if "pip_url" not in repo_payload: + repo_payload["pip_url"] = repo_payload.pop("url") + if "url" not in repo_payload: + repo_payload["url"] = repo_payload.pop("pip_url") + + repo_payload["progress_callback"] = progress_callback or progress_cb + + repo_url = t.cast("str", repo_payload["url"]) + repo_vcs = t.cast("VCSLiteral | None", repo_payload.get("vcs")) + if repo_vcs is None: + vcs = guess_vcs(url=repo_url) + if vcs is None: + raise CouldNotGuessVCSFromURL(repo_url=repo_url) - repo_dict["progress_callback"] = progress_callback or progress_cb + repo_payload["vcs"] = vcs + repo_vcs = vcs - if repo_dict.get("vcs") is None: - vcs = guess_vcs(url=repo_dict["url"]) - if vcs is None: - raise CouldNotGuessVCSFromURL(repo_url=repo_dict["url"]) + assert repo_vcs is not None - repo_dict["vcs"] = vcs + r: GitSync | HgSync | SvnSync + if repo_vcs == "git": + r = create_project(**t.cast("GitRepoPayload", repo_payload)) + elif repo_vcs == "svn": + r = create_project(**t.cast("SvnRepoPayload", repo_payload)) + else: + r = create_project(**t.cast("HgRepoPayload", repo_payload)) - r = create_project(**repo_dict) # Creates the repo object r.update_repo(set_remotes=True) # Creates repo if not exists and fetches - # TODO: Fix this - return r # type:ignore + return r diff --git a/src/vcspull/config.py b/src/vcspull/config.py index 803f4eb3..cbc739f7 100644 --- a/src/vcspull/config.py +++ b/src/vcspull/config.py @@ -78,7 +78,7 @@ def extract_repos( for directory, repos in config.items(): assert isinstance(repos, dict) for repo, repo_data in repos.items(): - conf: dict[str, t.Any] = {} + conf: dict[str, object] = {} """ repo_name: http://myrepo.com/repo.git @@ -90,10 +90,13 @@ def extract_repos( also assures the repo is a :py:class:`dict`. """ - if isinstance(repo_data, str): - conf["url"] = repo_data + if isinstance(repo_data, (str, pathlib.Path)): + conf["url"] = str(repo_data) else: - conf = update_dict(conf, repo_data) + conf = update_dict( + conf, + t.cast("dict[str, object]", repo_data), + ) if "repo" in conf: if "url" not in conf: @@ -108,9 +111,9 @@ def extract_repos( conf["workspace_root"] = directory if "path" not in conf: + name = t.cast("str", conf["name"]) conf["path"] = expand_dir( - pathlib.Path(expand_dir(pathlib.Path(directory), cwd=cwd)) - / conf["name"], + pathlib.Path(expand_dir(pathlib.Path(directory), cwd=cwd)) / name, cwd, ) @@ -133,7 +136,7 @@ def extract_repos( **url, ) - def is_valid_config_dict(val: t.Any) -> t.TypeGuard[ConfigDict]: + def is_valid_config_dict(val: object) -> t.TypeGuard[ConfigDict]: assert isinstance(val, dict) return True @@ -460,7 +463,10 @@ def is_config_file( return any(filename.endswith(e) for e in extensions) -def save_config_yaml(config_file_path: pathlib.Path, data: dict[t.Any, t.Any]) -> None: +def save_config_yaml( + config_file_path: pathlib.Path, + data: t.Mapping[str, object], +) -> None: """Save configuration data to a YAML file. Parameters @@ -472,7 +478,7 @@ def save_config_yaml(config_file_path: pathlib.Path, data: dict[t.Any, t.Any]) - """ yaml_content = ConfigReader._dump( fmt="yaml", - content=data, + content=t.cast("dict[str, object]", data), indent=2, ) config_file_path.write_text(yaml_content, encoding="utf-8") @@ -480,7 +486,7 @@ def save_config_yaml(config_file_path: pathlib.Path, data: dict[t.Any, t.Any]) - def save_config_yaml_with_items( config_file_path: pathlib.Path, - items: list[tuple[str, t.Any]], + items: list[tuple[str, object]], ) -> None: """Persist configuration data while preserving duplicate top-level sections.""" documents: list[str] = [] @@ -488,7 +494,7 @@ def save_config_yaml_with_items( for label, section in items: dumped = ConfigReader._dump( fmt="yaml", - content={label: section}, + content=t.cast("dict[str, object]", {label: section}), indent=2, ).rstrip() if dumped: @@ -503,8 +509,8 @@ def save_config_yaml_with_items( def merge_duplicate_workspace_root_entries( label: str, - occurrences: list[t.Any], -) -> tuple[t.Any, list[str], int]: + occurrences: list[object], +) -> tuple[object, list[str], int]: """Merge duplicate entries for a single workspace root.""" conflicts: list[str] = [] change_count = max(len(occurrences) - 1, 0) @@ -521,7 +527,7 @@ def merge_duplicate_workspace_root_entries( ) return occurrences[-1], conflicts, change_count - merged: dict[str, t.Any] = {} + merged: dict[str, object] = {} for entry in occurrences: assert isinstance(entry, dict) @@ -540,9 +546,9 @@ def merge_duplicate_workspace_root_entries( def merge_duplicate_workspace_roots( - config_data: dict[str, t.Any], - duplicate_roots: dict[str, list[t.Any]], -) -> tuple[dict[str, t.Any], list[str], int, list[tuple[str, int]]]: + config_data: dict[str, object], + duplicate_roots: dict[str, list[object]], +) -> tuple[dict[str, object], list[str], int, list[tuple[str, int]]]: """Merge duplicate workspace root sections captured during load.""" if not duplicate_roots: return copy.deepcopy(config_data), [], 0, [] @@ -610,17 +616,17 @@ def workspace_root_label( def normalize_workspace_roots( - config_data: dict[str, t.Any], + config_data: t.Mapping[str, object], *, cwd: pathlib.Path | None = None, home: pathlib.Path | None = None, preserve_cwd_label: bool = True, -) -> tuple[dict[str, t.Any], dict[pathlib.Path, str], list[str], int]: +) -> tuple[dict[str, object], dict[pathlib.Path, str], list[str], int]: """Normalize workspace root labels and merge duplicate sections.""" cwd = cwd or pathlib.Path.cwd() home = home or pathlib.Path.home() - normalized: dict[str, t.Any] = {} + normalized: dict[str, object] = {} path_to_label: dict[pathlib.Path, str] = {} conflicts: list[str] = [] change_count = 0 diff --git a/src/vcspull/log.py b/src/vcspull/log.py index 7acdc802..8fecf43e 100644 --- a/src/vcspull/log.py +++ b/src/vcspull/log.py @@ -175,8 +175,23 @@ def template(self, record: logging.LogRecord) -> str: return "".join(reset + levelname + asctime + name + reset) - def __init__(self, color: bool = True, **kwargs: t.Any) -> None: - logging.Formatter.__init__(self, **kwargs) + def __init__( + self, + color: bool = True, + fmt: str | None = None, + datefmt: str | None = None, + style: t.Literal["%", "{", "$"] = "%", + validate: bool = True, + defaults: t.Mapping[str, object] | None = None, + ) -> None: + logging.Formatter.__init__( + self, + fmt=fmt, + datefmt=datefmt, + style=style, + validate=validate, + defaults=defaults, + ) def format(self, record: logging.LogRecord) -> str: """Format log record.""" diff --git a/src/vcspull/types.py b/src/vcspull/types.py index 6d8674eb..ad433a95 100644 --- a/src/vcspull/types.py +++ b/src/vcspull/types.py @@ -33,28 +33,24 @@ import typing as t from typing import TypeAlias -from typing_extensions import NotRequired, TypedDict +import typing_extensions if t.TYPE_CHECKING: - from libvcs._internal.types import StrPath, VCSLiteral + from libvcs._internal.types import VCSLiteral from libvcs.sync.git import GitSyncRemoteDict +RawRepoFieldValue: TypeAlias = ( + str | list[str] | dict[str, str] | dict[str, dict[str, str]] | None +) +RawRepoConfigValue: TypeAlias = str | pathlib.Path | dict[str, RawRepoFieldValue] +RawWorkspaceConfig: TypeAlias = dict[str, RawRepoConfigValue] +RawConfigDict: TypeAlias = dict[str, RawWorkspaceConfig] -class RawConfigDict(t.TypedDict): - """Configuration dictionary without any type marshalling or variable resolution.""" - - vcs: VCSLiteral - name: str - path: StrPath - url: str - remotes: GitSyncRemoteDict - - -RawConfigDir = dict[str, RawConfigDict] -RawConfig = dict[str, RawConfigDir] +RawConfigDir: TypeAlias = RawWorkspaceConfig +RawConfig: TypeAlias = RawConfigDict -class ConfigDict(TypedDict): +class ConfigDict(typing_extensions.TypedDict): """Configuration map for vcspull after shorthands and variables resolved.""" vcs: VCSLiteral | None @@ -62,8 +58,8 @@ class ConfigDict(TypedDict): path: pathlib.Path url: str workspace_root: str - remotes: NotRequired[GitSyncRemoteDict | None] - shell_command_after: NotRequired[list[str] | None] + remotes: typing_extensions.NotRequired[GitSyncRemoteDict | None] + shell_command_after: typing_extensions.NotRequired[list[str] | None] ConfigDir = dict[str, ConfigDict] diff --git a/src/vcspull/util.py b/src/vcspull/util.py index 74496042..a9481f67 100644 --- a/src/vcspull/util.py +++ b/src/vcspull/util.py @@ -5,7 +5,7 @@ import os import pathlib import typing as t -from collections.abc import Mapping +from collections.abc import Mapping, MutableMapping LEGACY_CONFIG_DIR = pathlib.Path("~/.vcspull/").expanduser() # remove dupes of this @@ -42,12 +42,12 @@ def get_config_dir() -> pathlib.Path: return path -T = t.TypeVar("T", bound=dict[str, t.Any]) +T = t.TypeVar("T", bound=MutableMapping[str, object]) def update_dict( d: T, - u: T, + u: Mapping[str, object], ) -> T: """Return updated dict. @@ -67,7 +67,13 @@ def update_dict( """ for k, v in u.items(): if isinstance(v, Mapping): - r = update_dict(d.get(k, {}), v) + current = d.get(k) + if isinstance(current, MutableMapping): + r = update_dict(current, t.cast("Mapping[str, object]", v)) + elif isinstance(current, Mapping): + r = update_dict(dict(current), t.cast("Mapping[str, object]", v)) + else: + r = update_dict({}, t.cast("Mapping[str, object]", v)) d[k] = r else: d[k] = v diff --git a/src/vcspull/validator.py b/src/vcspull/validator.py index b209566d..aa47ac1f 100644 --- a/src/vcspull/validator.py +++ b/src/vcspull/validator.py @@ -8,7 +8,7 @@ from vcspull.types import RawConfigDict -def is_valid_config(config: dict[str, t.Any]) -> t.TypeGuard[RawConfigDict]: +def is_valid_config(config: dict[str, object]) -> t.TypeGuard[RawConfigDict]: """Return true and upcast if vcspull configuration file is valid.""" if not isinstance(config, dict): return False diff --git a/tests/cli/test_add.py b/tests/cli/test_add.py index 10e13955..e662f0c7 100644 --- a/tests/cli/test_add.py +++ b/tests/cli/test_add.py @@ -22,6 +22,9 @@ from syrupy.assertion import SnapshotAssertion +ConfigData: t.TypeAlias = dict[str, dict[str, dict[str, str]]] + + class AddRepoFixture(t.NamedTuple): """Fixture for add repo test cases.""" @@ -32,8 +35,8 @@ class AddRepoFixture(t.NamedTuple): path_relative: str | None dry_run: bool use_default_config: bool - preexisting_config: dict[str, t.Any] | None - expected_in_config: dict[str, t.Any] + preexisting_config: ConfigData | None + expected_in_config: ConfigData expected_log_messages: list[str] @@ -169,8 +172,8 @@ def test_add_repo( path_relative: str | None, dry_run: bool, use_default_config: bool, - preexisting_config: dict[str, t.Any] | None, - expected_in_config: dict[str, t.Any], + preexisting_config: ConfigData | None, + expected_in_config: ConfigData, expected_log_messages: list[str], tmp_path: pathlib.Path, monkeypatch: MonkeyPatch, diff --git a/tests/cli/test_discover.py b/tests/cli/test_discover.py index badbd5c0..412ebb52 100644 --- a/tests/cli/test_discover.py +++ b/tests/cli/test_discover.py @@ -41,7 +41,7 @@ class DiscoverFixture(t.NamedTuple): yes: bool expected_repo_count: int config_relpath: str | None - preexisting_config: dict[str, t.Any] | None + preexisting_config: dict[str, object] | None user_input: str | None expected_workspace_labels: set[str] | None merge_duplicates: bool @@ -352,7 +352,7 @@ def test_discover_repos( yes: bool, expected_repo_count: int, config_relpath: str | None, - preexisting_config: dict[str, t.Any] | None, + preexisting_config: dict[str, object] | None, user_input: str | None, expected_workspace_labels: set[str] | None, merge_duplicates: bool, @@ -785,9 +785,9 @@ def test_discover_normalization_only_save( config_file = tmp_path / ".vcspull.yaml" config_file.write_text(yaml.dump(preexisting_config), encoding="utf-8") - save_calls: list[tuple[pathlib.Path, dict[str, t.Any]]] = [] + save_calls: list[tuple[pathlib.Path, dict[str, object]]] = [] - def _fake_save(path: pathlib.Path, data: dict[str, t.Any]) -> None: + def _fake_save(path: pathlib.Path, data: dict[str, object]) -> None: save_calls.append((path, data)) monkeypatch.setattr("vcspull.cli.discover.save_config_yaml", _fake_save) @@ -978,7 +978,7 @@ def test_discover_skips_non_dict_workspace( encoding="utf-8", ) - def _fail_save(path: pathlib.Path, data: dict[str, t.Any]) -> None: + def _fail_save(path: pathlib.Path, data: dict[str, object]) -> None: error_message = "save_config_yaml should not be called when skipping repo" raise AssertionError(error_message) diff --git a/tests/cli/test_fmt.py b/tests/cli/test_fmt.py index dedb53c0..692acfb0 100644 --- a/tests/cli/test_fmt.py +++ b/tests/cli/test_fmt.py @@ -28,7 +28,7 @@ class WorkspaceRootFixture(t.NamedTuple): """Fixture for workspace root normalization cases.""" test_id: str - config_factory: t.Callable[[pathlib.Path], dict[str, t.Any]] + config_factory: t.Callable[[pathlib.Path], dict[str, object]] WORKSPACE_ROOT_FIXTURES: list[WorkspaceRootFixture] = [ @@ -81,7 +81,7 @@ class WorkspaceRootFixture(t.NamedTuple): ) def test_workspace_root_normalization( test_id: str, - config_factory: t.Callable[[pathlib.Path], dict[str, t.Any]], + config_factory: t.Callable[[pathlib.Path], dict[str, object]], snapshot_json: SnapshotAssertion, ) -> None: """Ensure format_config merges duplicate workspace roots.""" diff --git a/tests/cli/test_list.py b/tests/cli/test_list.py index e81b9ec4..93abff73 100644 --- a/tests/cli/test_list.py +++ b/tests/cli/test_list.py @@ -16,7 +16,10 @@ from _pytest.monkeypatch import MonkeyPatch -def create_test_config(config_path: pathlib.Path, repos: dict[str, t.Any]) -> None: +ConfigData: t.TypeAlias = dict[str, dict[str, dict[str, str]]] + + +def create_test_config(config_path: pathlib.Path, repos: ConfigData) -> None: """Create a test config file.""" with config_path.open("w", encoding="utf-8") as f: yaml.dump(repos, f) @@ -26,7 +29,7 @@ class ListReposFixture(t.NamedTuple): """Fixture for list repos test cases.""" test_id: str - config_data: dict[str, t.Any] + config_data: ConfigData patterns: list[str] tree: bool output_json: bool @@ -122,7 +125,7 @@ class ListReposFixture(t.NamedTuple): ) def test_list_repos( test_id: str, - config_data: dict[str, t.Any], + config_data: ConfigData, patterns: list[str], tree: bool, output_json: bool, diff --git a/tests/cli/test_plan_output_helpers.py b/tests/cli/test_plan_output_helpers.py index ff62a510..6ad73bac 100644 --- a/tests/cli/test_plan_output_helpers.py +++ b/tests/cli/test_plan_output_helpers.py @@ -8,6 +8,7 @@ from contextlib import redirect_stdout import pytest +import typing_extensions as te from vcspull.cli._colors import ColorMode, Colors from vcspull.cli._output import ( @@ -21,12 +22,32 @@ from vcspull.cli.sync import PlanProgressPrinter +class PlanEntryKwargs(t.TypedDict): + """Typed kwargs for PlanEntry construction in tests.""" + + name: str + path: str + workspace_root: str + action: PlanAction + detail: te.NotRequired[str] + url: te.NotRequired[str] + branch: te.NotRequired[str] + remote_branch: te.NotRequired[str] + current_rev: te.NotRequired[str] + target_rev: te.NotRequired[str] + ahead: te.NotRequired[int] + behind: te.NotRequired[int] + dirty: te.NotRequired[bool] + error: te.NotRequired[str] + diagnostics: te.NotRequired[list[str]] + + class PlanEntryPayloadFixture(t.NamedTuple): """Fixture for PlanEntry payload serialization.""" test_id: str - kwargs: dict[str, t.Any] - expected_keys: dict[str, t.Any] + kwargs: PlanEntryKwargs + expected_keys: dict[str, object] unexpected_keys: set[str] @@ -86,19 +107,20 @@ class PlanEntryPayloadFixture(t.NamedTuple): ) def test_plan_entry_to_payload( test_id: str, - kwargs: dict[str, t.Any], - expected_keys: dict[str, t.Any], + kwargs: PlanEntryKwargs, + expected_keys: dict[str, object], unexpected_keys: set[str], ) -> None: """Ensure PlanEntry serialises optional fields correctly.""" entry = PlanEntry(**kwargs) payload = entry.to_payload() + payload_map = t.cast(dict[str, object], payload) for key, value in expected_keys.items(): - assert payload[key] == value + assert payload_map[key] == value for key in unexpected_keys: - assert key not in payload + assert key not in payload_map assert payload["format_version"] == "1" assert payload["type"] == "operation" diff --git a/tests/cli/test_status.py b/tests/cli/test_status.py index 9a47cd37..4eb8c441 100644 --- a/tests/cli/test_status.py +++ b/tests/cli/test_status.py @@ -11,6 +11,7 @@ from vcspull.cli.status import ( StatusCheckConfig, + StatusResult, _check_repos_status_async, check_repo_status, status_repos, @@ -24,7 +25,10 @@ from vcspull.types import ConfigDict -def create_test_config(config_path: pathlib.Path, repos: dict[str, t.Any]) -> None: +ConfigData: t.TypeAlias = dict[str, dict[str, dict[str, str]]] + + +def create_test_config(config_path: pathlib.Path, repos: ConfigData) -> None: """Create a test config file.""" with config_path.open("w", encoding="utf-8") as f: yaml.dump(repos, f) @@ -223,7 +227,16 @@ def test_check_repo_status( else: repo_path.mkdir(parents=True) - repo_dict: t.Any = {"name": "test-repo", "path": str(repo_path)} + repo_dict = t.cast( + "ConfigDict", + { + "vcs": None, + "name": "test-repo", + "path": repo_path, + "url": str(repo_path), + "workspace_root": str(tmp_path), + }, + ) status = check_repo_status(repo_dict, detailed=False) @@ -630,13 +643,24 @@ async def test_check_repos_status_async_concurrency_limit( ) -> None: """Test that semaphore limits concurrent operations.""" # Create multiple repos - repos_list = [] + repos_list: list[ConfigDict] = [] for i in range(10): repo_path = tmp_path / f"repo{i}" init_git_repo(repo_path) - repos_list.append({"name": f"repo{i}", "path": str(repo_path)}) + repos_list.append( + t.cast( + "ConfigDict", + { + "vcs": None, + "name": f"repo{i}", + "path": repo_path, + "url": str(repo_path), + "workspace_root": str(tmp_path), + }, + ), + ) - repos = t.cast("list[ConfigDict]", repos_list) + repos = repos_list # Track concurrent calls concurrent_calls = [] @@ -644,7 +668,7 @@ async def test_check_repos_status_async_concurrency_limit( original_check = check_repo_status - def tracked_check(repo: t.Any, detailed: bool = False) -> dict[str, t.Any]: + def tracked_check(repo: ConfigDict, detailed: bool = False) -> StatusResult: concurrent_calls.append(1) nonlocal max_concurrent_seen current = len(concurrent_calls) diff --git a/tests/cli/test_sync_plan_helpers.py b/tests/cli/test_sync_plan_helpers.py index 1efa3ef7..1872429f 100644 --- a/tests/cli/test_sync_plan_helpers.py +++ b/tests/cli/test_sync_plan_helpers.py @@ -2,6 +2,8 @@ from __future__ import annotations +import collections.abc as cabc +import os import subprocess import typing as t @@ -13,6 +15,35 @@ if t.TYPE_CHECKING: import pathlib + from vcspull.cli.status import StatusResult + + +StatusOverride: t.TypeAlias = dict[str, bool | int | None] +CompletedProcessArgs: t.TypeAlias = ( + str + | bytes + | os.PathLike[str] + | os.PathLike[bytes] + | cabc.Sequence[str | bytes | os.PathLike[str] | os.PathLike[bytes]] +) + + +def _build_status(overrides: StatusOverride) -> StatusResult: + """Build a StatusResult with defaults for required keys.""" + base: dict[str, object] = { + "name": "repo", + "path": "/tmp/repo", + "workspace_root": "/tmp", + "exists": False, + "is_git": False, + "clean": None, + "branch": None, + "ahead": None, + "behind": None, + } + base.update(overrides) + return t.cast("StatusResult", base) + class MaybeFetchFixture(t.NamedTuple): """Fixture for _maybe_fetch behaviours.""" @@ -110,8 +141,8 @@ def test_maybe_fetch_behaviour( if subprocess_behavior: def _patched_run( - *args: t.Any, - **kwargs: t.Any, + args: CompletedProcessArgs, + **kwargs: object, ) -> subprocess.CompletedProcess[str]: if subprocess_behavior == "file-not-found": error_message = "git executable not found" @@ -121,13 +152,13 @@ def _patched_run( raise OSError(error_message) if subprocess_behavior == "non-zero": return subprocess.CompletedProcess( - args=args[0], + args=args, returncode=1, stdout="", stderr="remote rejected", ) return subprocess.CompletedProcess( - args=args[0], + args=args, returncode=0, stdout="", stderr="", @@ -147,7 +178,7 @@ class DeterminePlanActionFixture(t.NamedTuple): """Fixture for _determine_plan_action outcomes.""" test_id: str - status: dict[str, t.Any] + status: StatusOverride config: SyncPlanConfig expected_action: PlanAction expected_detail: str @@ -239,12 +270,15 @@ class DeterminePlanActionFixture(t.NamedTuple): ) def test_determine_plan_action( test_id: str, - status: dict[str, t.Any], + status: StatusOverride, config: SyncPlanConfig, expected_action: PlanAction, expected_detail: str, ) -> None: """Verify _determine_plan_action handles edge cases.""" - action, detail = _determine_plan_action(status, config=config) + action, detail = _determine_plan_action( + _build_status(status), + config=config, + ) assert action is expected_action assert detail == expected_detail diff --git a/tests/helpers.py b/tests/helpers.py index b88b727d..39232a19 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -5,7 +5,7 @@ import os import typing as t -from typing_extensions import Self +import typing_extensions from vcspull._internal.config_reader import ConfigReader @@ -40,7 +40,7 @@ def unset(self, envvar: str) -> None: self._reset[envvar] = self._environ[envvar] del self._environ[envvar] - def __enter__(self) -> Self: + def __enter__(self) -> typing_extensions.Self: """Context manager entry for setting and resetting environmental variable.""" return self @@ -58,6 +58,6 @@ def write_config(config_path: pathlib.Path, content: str) -> pathlib.Path: return config_path -def load_raw(data: str, fmt: t.Literal["yaml", "json"]) -> dict[str, t.Any]: +def load_raw(data: str, fmt: t.Literal["yaml", "json"]) -> dict[str, object]: """Load configuration data via string value. Accepts yaml or json.""" return ConfigReader._load(fmt=fmt, content=data) diff --git a/tests/test_cli.py b/tests/test_cli.py index 48a9056b..af7ba89a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -16,11 +16,22 @@ from vcspull.__about__ import __version__ from vcspull._internal.private_path import PrivatePath from vcspull.cli import cli -from vcspull.cli._output import PlanAction, PlanEntry, PlanResult, PlanSummary +from vcspull.cli._output import ( + JsonValue, + PlanAction, + PlanEntry, + PlanResult, + PlanSummary, +) from vcspull.cli.sync import EXIT_ON_ERROR_MSG, NO_REPOS_FOR_TERM_MSG sync_module = importlib.import_module("vcspull.cli.sync") +RepoConfig: t.TypeAlias = dict[str, str | dict[str, str]] +ConfigData: t.TypeAlias = dict[str, dict[str, RepoConfig]] +OperationSubset: t.TypeAlias = dict[str, JsonValue] +RepoRecord: t.TypeAlias = dict[str, str] + if t.TYPE_CHECKING: from typing import TypeAlias @@ -652,7 +663,7 @@ def test_sync_dry_run_plan_human( if set_no_color: monkeypatch.setenv("NO_COLOR", "1") - config: dict[str, dict[str, dict[str, t.Any]]] = {"~/github_projects/": {}} + config: ConfigData = {"~/github_projects/": {}} for name in repository_names: config["~/github_projects/"][name] = { "url": f"git+file://{git_repo.path}", @@ -697,7 +708,7 @@ def test_sync_dry_run_plan_human( errors=sum(entry.action is PlanAction.ERROR for entry in plan_entries), ) - async def _fake_plan(*args: t.Any, **kwargs: t.Any) -> PlanResult: + async def _fake_plan(*args: object, **kwargs: object) -> PlanResult: return PlanResult(entries=plan_entries, summary=computed_summary) monkeypatch.setattr(sync_module, "_build_plan_result_async", _fake_plan) @@ -728,7 +739,7 @@ class DryRunPlanMachineFixture(t.NamedTuple): pre_sync: bool = True plan_entries: list[PlanEntry] | None = None plan_summary: PlanSummary | None = None - expected_operation_subset: dict[str, t.Any] | None = None + expected_operation_subset: OperationSubset | None = None DRY_RUN_PLAN_MACHINE_FIXTURES: list[DryRunPlanMachineFixture] = [ @@ -835,7 +846,7 @@ def test_sync_dry_run_plan_machine( pre_sync: bool, plan_entries: list[PlanEntry] | None, plan_summary: PlanSummary | None, - expected_operation_subset: dict[str, t.Any] | None, + expected_operation_subset: OperationSubset | None, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str], monkeypatch: pytest.MonkeyPatch, @@ -846,7 +857,7 @@ def test_sync_dry_run_plan_machine( """Validate machine-readable plan parity.""" monkeypatch.setenv("NO_COLOR", "1") - config: dict[str, dict[str, dict[str, t.Any]]] = {"~/github_projects/": {}} + config: ConfigData = {"~/github_projects/": {}} for name in repository_names: config["~/github_projects/"][name] = { "url": f"git+file://{git_repo.path}", @@ -889,7 +900,7 @@ def test_sync_dry_run_plan_machine( errors=sum(entry.action is PlanAction.ERROR for entry in plan_entries), ) - async def _fake_plan(*args: t.Any, **kwargs: t.Any) -> PlanResult: + async def _fake_plan(*args: object, **kwargs: object) -> PlanResult: return PlanResult(entries=plan_entries, summary=computed_summary) monkeypatch.setattr(sync_module, "_build_plan_result_async", _fake_plan) @@ -924,7 +935,7 @@ async def _fake_plan(*args: t.Any, **kwargs: t.Any) -> PlanResult: assert summary["errors"] == expected_summary["errors"] if mode == "json" and expected_operation_subset: - operations: list[dict[str, t.Any]] = [] + operations: list[dict[str, JsonValue]] = [] for workspace in payload["workspaces"]: operations.extend(workspace["operations"]) assert operations, "Expected at least one operation payload" @@ -1005,12 +1016,12 @@ def test_sync_human_output_redacts_repo_paths( ) def _fake_filter_repos( - _configs: list[dict[str, t.Any]], + _configs: list[RepoRecord], *, path: str | None = None, vcs_url: str | None = None, name: str | None = None, - ) -> list[dict[str, t.Any]]: + ) -> list[RepoRecord]: if name and name != repo_config["name"]: return [] if path and path != repo_config["path"]: diff --git a/tests/test_config.py b/tests/test_config.py index 55a4f000..28f918bc 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -24,7 +24,7 @@ def __call__( content: str, path: str = "randomdir", filename: str = "randomfilename.yaml", - ) -> tuple[pathlib.Path, list[t.Any | pathlib.Path], list[ConfigDict]]: + ) -> tuple[pathlib.Path, list[pathlib.Path], list[ConfigDict]]: """Callable function type signature for load_yaml pytest fixture.""" ... @@ -123,7 +123,7 @@ class ExtractWorkspaceFixture(t.NamedTuple): ) def test_extract_repos_injects_workspace_root( test_id: str, - raw_config: dict[str, dict[str, str | dict[str, str]]], + raw_config: RawConfigDict, expected_roots: dict[str, str], tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, @@ -134,8 +134,7 @@ def test_extract_repos_injects_workspace_root( monkeypatch.setenv("HOME", str(tmp_path)) monkeypatch.chdir(tmp_path) - typed_raw_config = t.cast("RawConfigDict", raw_config) - repos = config.extract_repos(typed_raw_config, cwd=tmp_path) + repos = config.extract_repos(raw_config, cwd=tmp_path) assert len(repos) == len(expected_roots) diff --git a/tests/test_config_writer.py b/tests/test_config_writer.py index fe4f547b..3cf3dea4 100644 --- a/tests/test_config_writer.py +++ b/tests/test_config_writer.py @@ -12,7 +12,7 @@ if t.TYPE_CHECKING: import pathlib -FixtureEntry = tuple[str, dict[str, t.Any]] +FixtureEntry = tuple[str, dict[str, dict[str, str]]] @pytest.mark.parametrize( diff --git a/tests/test_sync.py b/tests/test_sync.py index 9ba9deed..74f6c5ea 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -166,7 +166,7 @@ def test_config_variations( assert len(repos) == 1 for repo_dict in repos: - repo: GitSync = update_repo(repo_dict) + repo = t.cast("GitSync", update_repo(repo_dict)) remotes = repo.remotes() or {} remote_names = set(remotes.keys()) assert set(remote_list).issubset(remote_names) or {"origin"}.issubset( @@ -281,7 +281,7 @@ def test_updating_remote( for repo_dict in filter_repos( [initial_config], ): - local_git_remotes = update_repo(repo_dict).remotes() + local_git_remotes = t.cast("GitSync", update_repo(repo_dict)).remotes() assert "origin" in local_git_remotes expected_remote_url = f"git+file://{mirror_repo}" @@ -296,7 +296,7 @@ def test_updating_remote( repo_dict = filter_repos([expected_config], name="myclone")[0] assert isinstance(repo_dict, dict) - repo = update_repo(repo_dict) + repo = t.cast("GitSync", update_repo(repo_dict)) for remote_name in repo.remotes(): remote = repo.remote(remote_name) if remote is not None: