Coverage for src/git_dag/git_repository.py: 96%
296 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 12:49 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 12:49 +0200
1"""Git repository parsing functionality."""
3from __future__ import annotations
5import logging
6import multiprocessing
7import re
8from functools import wraps
9from operator import itemgetter
10from pathlib import Path
11from time import time
12from typing import Annotated, Any, Callable, Optional, Type, cast
14from pydantic import BeforeValidator, TypeAdapter
16from git_dag.exceptions import CalledProcessCustomError
18from .constants import GIT_EMPTY_TREE_OBJECT_SHA, SHA_PATTERN, DictStrStr
19from .dag import DagVisualizer
20from .git_commands import GitCommand
21from .git_objects import (
22 GitBlob,
23 GitBranch,
24 GitCommit,
25 GitCommitRawDataType,
26 GitHead,
27 GitObject,
28 GitObjectKind,
29 GitStash,
30 GitTag,
31 GitTagLightweight,
32 GitTagRawDataType,
33 GitTree,
34 GitTreeRawDataType,
35)
36from .parameters import Params
37from .utils import creator_timestamp_format
39IG = itemgetter("sha", "kind")
40logging.basicConfig(level=logging.WARNING)
41LOG = logging.getLogger(__name__)
43# https://stackoverflow.com/q/9765453
44# For example it is created when using git rebase -i --root
45GIT_EMPTY_TREE_OBJECT = GitTree(
46 sha=GIT_EMPTY_TREE_OBJECT_SHA,
47 raw_data=[],
48 no_children=True,
49)
52def time_it[R, **P](f: Callable[P, R]) -> Callable[P, R]:
53 """Return decorator for timing.
55 Note
56 -----
57 The generic ``P`` is a ``ParamSpec``.
59 """
61 @wraps(f)
62 def wrap(*args: P.args, **kwargs: P.kwargs) -> R:
63 ts = time()
64 result = f(*args, **kwargs)
65 te = time()
66 LOG.info(f"{f.__qualname__:<30} took: {te-ts:0.5f} sec")
67 return result
69 return wrap
72class RegexParser:
73 """Regex parser for files associated with git objects.
75 Note
76 -----
77 All this is quite ad hoc.
79 """
81 @staticmethod
82 def parse_object_descriptor(string: str) -> DictStrStr:
83 """Parse an object descriptor with format ``SHA OBJECT_TYPE``."""
84 pattern = f"^{SHA_PATTERN} (?P<kind>.+)"
85 match = re.search(pattern, string)
86 if match:
87 return {"sha": match.group("sha"), "kind": match.group("kind")}
88 raise RuntimeError(f'Object string "{string}" not matched.') # pragma: no cover
90 @staticmethod
91 def parse_tree_info(data: Optional[list[str]] = None) -> GitTreeRawDataType:
92 """Parse a tree object file (read with ``cat-file -p``)."""
93 # for the empty tree object, data = [""]
94 if data is None or (len(data) == 1 and not data[0]):
95 return []
97 # in the presence of submodules, trees may refer to commits as well
98 pattern = f"(?P<kind>tree|blob|commit) {SHA_PATTERN}\t"
99 output = []
100 for string in data:
101 match = re.search(pattern, string)
102 if match:
103 kind = match.group("kind")
104 if kind != "commit": # skip references to commits 104 ↛ 100line 104 didn't jump to line 100 because the condition on line 104 was always true
105 output.append({"sha": match.group("sha"), "kind": kind})
106 else:
107 raise RuntimeError(
108 f'Tree string "{string}" not matched.'
109 ) # pragma: no cover
111 return output
113 @staticmethod
114 def _collect_commit_info(
115 commit_object_data: list[DictStrStr],
116 misc_info: list[str],
117 ) -> GitCommitRawDataType:
118 """Collect commit related info."""
120 def strip_creator_label(string: str) -> str:
121 """Remove the author/committer label.
123 E.g., remove the "author" from "author First Last <first.last.mail.com>".
124 """
125 return " ".join(string.split()[1:])
127 def extract_message(misc_info: list[str]) -> str:
128 return "\n".join(
129 [
130 string.strip()
131 for string in misc_info[2:] # skip author and committer
132 if string and not string.startswith("Co-authored-by")
133 ]
134 )
136 parents = []
137 tree = ""
138 for d in commit_object_data:
139 sha, kind = IG(d)
140 if kind == "tree":
141 if tree:
142 raise ValueError(
143 "Exactly one tree expected per commit."
144 ) # pragma: no cover
145 tree = sha
146 elif kind == "parent":
147 parents.append(sha)
148 else:
149 raise RuntimeError("It is not expected to be here!") # pragma: no cover
151 author, author_email, author_date = creator_timestamp_format(
152 strip_creator_label(misc_info[0])
153 )
154 committer, committer_email, committer_date = creator_timestamp_format(
155 strip_creator_label(misc_info[1])
156 )
157 return {
158 "tree": tree,
159 "parents": parents,
160 "message": extract_message(misc_info),
161 "author": author,
162 "author_email": author_email,
163 "author_date": author_date,
164 "committer": committer,
165 "committer_email": committer_email,
166 "committer_date": committer_date,
167 }
169 @staticmethod
170 def parse_commit_info(data: list[str]) -> GitCommitRawDataType:
171 """Parse a commit object file (read with ``git cat-file -p``)."""
172 pattern = f"^(?P<kind>tree|parent) {SHA_PATTERN}"
173 output, misc_info = [], []
174 # The tree and the parents always come first in the object file of a commit.
175 # Next is the author, and this is the start of what I call "misc info".
176 # collect_misc_info is used to avoid matching a commit message like "tree SHA".
177 collect_misc_info = False
178 for string in data:
179 match = re.search(pattern, string)
180 if not collect_misc_info and match:
181 output.append({"sha": match.group("sha"), "kind": match.group("kind")})
182 else:
183 collect_misc_info = True
184 misc_info.append(string)
186 return RegexParser._collect_commit_info(output, misc_info)
188 @staticmethod
189 def parse_tag_info(data: list[str]) -> GitTagRawDataType:
190 """Parse a tag object file (read using ``git cat-file -p``)."""
191 labels = ["sha", "type", "refname", "tagger"]
192 patterns = [
193 f"^object {SHA_PATTERN}",
194 "^type (?P<type>.+)",
195 "^tag (?P<refname>.+)",
196 "^tagger (?P<tagger>.+)",
197 ]
199 output = {}
200 for pattern, string, label in zip(patterns, data, labels):
201 match = re.search(pattern, string)
202 if match:
203 output[label] = match.group(label)
204 else:
205 raise RuntimeError(
206 f'Tag string "{string}" not matched.'
207 ) # pragma: no cover
209 tagger, tagger_email, tag_date = creator_timestamp_format(output["tagger"])
210 output["taggername"] = tagger
211 output["taggeremail"] = tagger_email
212 output["taggerdate"] = tag_date
213 output["message"] = "\n".join(data[5:])
214 output["anchor"] = output.pop("sha")
215 output["tag"] = output["refname"] # abusing things a bit
216 return output
218 @staticmethod
219 def parse_stash_info(data: Optional[list[str]]) -> list[DictStrStr]:
220 """Parse stash info as returned by :func:`GitCommand.get_stash_info`."""
221 if not data:
222 return []
224 pattern = f"{SHA_PATTERN} stash@{ (?P<index>[0-9]+)} (?P<title>.*)"
225 keys = ["index", "sha", "title"]
227 out = []
228 for string in data:
229 match = re.search(pattern, string)
230 if match:
231 out.append({key: match.group(key) for key in keys})
232 else:
233 raise RuntimeError(
234 'Stash string "{string}" not matched.'
235 ) # pragma: no cover
237 return out
240class GitInspector:
241 """Git inspector."""
243 @time_it
244 def __init__(self, repository_path: str | Path = ".", parse_trees: bool = False):
245 """Initialize instance (read most required info from the repository).
247 Parameters
248 -----------
249 repository_path
250 Path to the git repository.
251 parse_trees
252 Whether to parse the tree objects (doing this can be very slow and is best
253 omitted for anything other than small repos). FIXME: currenlty all tree
254 objects are parsed even if we intend to display only a small part of them.
256 """
257 self.parse_trees = parse_trees
258 self.repository_path = repository_path
259 self.git = GitCommand(repository_path)
261 self.objects_sha_kind = self.git.get_objects_sha_kind()
262 self.commits_sha = self._get_commits_sha()
263 self.commits_info = self._get_commits_info()
264 self.tags_info_parsed = self.git.get_tags_info_parsed()
265 self.trees_info = self._get_trees_info() if self.parse_trees else {}
266 self.blobs_and_trees_names: DictStrStr = self.git.get_blobs_and_trees_names(
267 self.trees_info
268 )
269 self.stashes_info_parsed = RegexParser.parse_stash_info(
270 self.git.get_stash_info()
271 )
272 self.notes_dag_root = self.git.get_notes_dag_root()
274 def _get_commits_sha(self) -> dict[str, set[str]]:
275 """Return SHA of all reachable/unreachable commits.
277 Note
278 -----
279 Git handles stashes through the reflog and it keeps only the last stash in
280 ``.git/refs/stash`` (see output of ``git reflog stash``). Hence, we consider
281 commits associated with earlier stashes to be unreachable (as they are not
282 referred by any reference).
284 """
285 reachable_commits = set(self.git.rev_list("--all").strip().split("\n"))
286 all_commits = set(
287 obj.split()[0] for obj in self.objects_sha_kind if "commit" in obj
288 )
289 return {
290 "all": all_commits,
291 "reachable": reachable_commits,
292 "unreachable": all_commits - reachable_commits,
293 }
295 def _get_commits_info(self) -> dict[str, list[str]]:
296 """Get content of object files for all commits.
298 Note
299 -----
300 It is much faster to read the info for all commits using ``git rev-list --all
301 --reflog --header`` instead of using ``git cat-file -p SHA`` per commit. The
302 ``--reflog`` flag includes unreachable commits as well.
304 Warning
305 --------
306 In some cases, ``git rev-list --all --reflog`` doesn't return all unreachable
307 commits (when this happens, the corresponding object files are read using ``git
308 cat-file -p``).
310 """
311 commits_info = {}
312 for info in self.git.rev_list("--all --reflog --header").split("\x00"):
313 if info:
314 commit_sha, *rest = info.split("\n")
315 commits_info[commit_sha] = rest
317 numb_commits_not_found = len(self.commits_sha["all"]) - len(commits_info)
318 if numb_commits_not_found > 0: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 LOG.info(
320 f"{numb_commits_not_found} commits not found in "
321 "git rev-list --all --reflog"
322 )
323 elif numb_commits_not_found < 0:
324 raise RuntimeError("We shouldn't be here.") # pragma: no cover
326 return commits_info
328 @time_it
329 def _get_trees_info(self) -> dict[str, list[str]]:
330 """Get content of object files for all trees.
332 Warning
333 --------
334 This is slow! I simply don't know how to speed-up this operation. I ended-up
335 using multiprocessing but there must be a better way. In ``GitPython`` they
336 interact with ``git cat-file --batch`` with streams (to explore). It seems
337 strange to be able to read all object files for commits at once (using ``git
338 rev-list``) and to not be able to do it for trees (I must be missing something).
339 FIXME: to find a better way to do this.
341 """
342 all_sha = [obj.split()[0] for obj in self.objects_sha_kind if "tree" in obj]
343 with multiprocessing.Pool() as pool:
344 object_file_content = pool.map(
345 self.git.ls_tree,
346 all_sha,
347 )
348 return dict(zip(all_sha, object_file_content))
350 def _get_objects_info_parsed(self, sha: str, kind: str) -> GitObject:
351 match kind:
352 case GitObjectKind.blob:
353 return GitBlob(sha=sha)
354 case GitObjectKind.commit:
355 if sha in self.commits_info: 355 ↛ 358line 355 didn't jump to line 358 because the condition on line 355 was always true
356 commit_info = self.commits_info[sha]
357 else:
358 commit_info = self.git.read_object_file(sha) # slower
359 LOG.info(f"[commit] manually executing git cat-file -p {sha}")
361 return GitCommit(
362 sha=sha,
363 is_reachable=sha in self.commits_sha["reachable"],
364 raw_data=RegexParser.parse_commit_info(commit_info),
365 )
366 case GitObjectKind.tag:
367 try:
368 tag = self.tags_info_parsed["annotated"][sha]
369 is_deleted = False
370 except KeyError:
371 # slower (used only for deleted annotated tags)
372 tag = RegexParser.parse_tag_info(self.git.read_object_file(sha))
373 is_deleted = True
375 return GitTag(
376 sha=sha,
377 name=tag["refname"],
378 raw_data=tag,
379 is_deleted=is_deleted,
380 )
381 case GitObjectKind.tree:
382 return GitTree(
383 sha=sha,
384 raw_data=RegexParser.parse_tree_info(self.trees_info.get(sha)),
385 )
386 case _: # pragma: no cover
387 raise RuntimeError("Leaking objects!")
389 @time_it
390 def get_raw_objects(self) -> dict[str, GitObject]:
391 """Return all raw objects in a git repository.
393 Note
394 -----
395 The objects are "raw", in the sense that they are not fully initialized. For
396 example, consider a :class:`~git_dag.git_objects.GitTree` object. Even
397 though all necessary data is available in
398 :attr:`~git_dag.git_objects.GitTree.raw_data`, the ``GitTree._children``
399 field is still not initialized (and the
400 :class:`~git_dag.git_objects.GitTree` instances are not fully functional).
401 The remaining post-processing is performed in
402 :func:`~git_dag.git_repository.GitRepository.post_process_inspector_data` (as
403 all instances need to be formed first). The
404 :attr:`~git_dag.git_objects.GitObject.is_ready` property indicates whether
405 an instance has been fully initialized.
407 """
409 def git_entity_before_validator(object_descriptor: str) -> GitObject:
410 """Transform/validate data.
412 Note
413 -----
414 ``self`` is used from the closure.
416 """
417 return self._get_objects_info_parsed(
418 *IG(RegexParser.parse_object_descriptor(object_descriptor))
419 )
421 GitObjectAnnotated = Annotated[
422 GitObject,
423 BeforeValidator(git_entity_before_validator),
424 ]
426 return {
427 obj.sha: obj
428 for obj in TypeAdapter(list[GitObjectAnnotated]).validate_python(
429 self.objects_sha_kind
430 )
431 }
434class GitRepository:
435 """Git repository.
437 Note
438 -----
439 All git objects are processed (optionally tree objects can be skipped). This seems
440 fine even for large repositories, e.g., it takes less than 20 sec. to process the
441 repository of git itself which has 75K commits (without reading the tree object
442 files).
444 """
446 def __init__(
447 self,
448 repository_path: str | Path = ".",
449 parse_trees: bool = False,
450 ) -> None:
451 """Initialize instance.
453 Parameters
454 -----------
455 repository_path
456 Path to the git repository.
457 parse_trees
458 Whether to parse the tree objects (doing this can be very slow).
460 """
461 if not Path(repository_path).exists():
462 raise RuntimeError(f"Path {repository_path} doesn't exist.")
464 self.inspector = GitInspector(repository_path, parse_trees)
465 self.post_process_inspector_data()
467 @time_it
468 def post_process_inspector_data(self) -> None:
469 """Post-process inspector data (see :func:`GitInspector.get_raw_objects`)."""
470 self.objects: dict[str, GitObject] = self._form_objects()
471 self.all_reachable_objects_sha: set[str] = self.get_all_reachable_objects()
472 self.commits = self.filter_objects(GitCommit)
473 self.tags: dict[str, GitTag] = self._form_annotated_tags()
474 self.tags_lw: dict[str, GitTagLightweight] = self._form_lightweight_tags()
475 self.remotes: list[str] = self.inspector.git.get_remotes()
476 self.branches: list[GitBranch] = self._form_branches()
477 self.head: GitHead = self._form_local_head()
478 self.remote_heads: DictStrStr = self._form_remote_heads()
479 self.stashes: list[GitStash] = self._form_stashes()
480 self.notes_dag_root: Optional[DictStrStr] = self.inspector.notes_dag_root
482 @time_it
483 def _form_branches(self) -> list[GitBranch]:
484 """Post-process branches."""
485 branches_raw = self.inspector.git.get_branches(self.remotes)
486 branches: list[GitBranch] = []
488 for branch_name, sha in branches_raw["local"].items():
489 branches.append(
490 GitBranch(
491 name=branch_name,
492 commit=self.commits[sha],
493 is_local=True,
494 tracking=self.inspector.git.local_branch_is_tracking(branch_name),
495 )
496 )
498 for branch_name, sha in branches_raw["remote"].items():
499 branches.append(
500 GitBranch(
501 name=branch_name,
502 commit=self.commits[sha],
503 )
504 )
506 return branches
508 @time_it
509 def _form_local_head(self) -> GitHead:
510 """Post-process HEAD."""
511 try:
512 head_commit_sha = self.inspector.git.get_local_head_commit_sha()
513 except CalledProcessCustomError:
514 LOG.warning("No Head")
515 return GitHead()
517 head_branch_name = self.inspector.git.get_local_head_branch()
518 if head_branch_name is None: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 return GitHead(commit=self.commits[head_commit_sha])
521 head_branch = [b for b in self.branches if b.name == head_branch_name]
522 if len(head_branch) != 1: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 raise RuntimeError("Head branch not found!")
525 return GitHead(commit=self.commits[head_commit_sha], branch=head_branch[0])
527 @time_it
528 def _form_remote_heads(self) -> DictStrStr:
529 """Form remote HEADs."""
530 return self.inspector.git.get_remote_heads_sym_ref(self.remotes)
532 @time_it
533 def _form_annotated_tags(self) -> dict[str, GitTag]:
534 """Post-process annotated tags."""
535 tags = {}
536 for sha, obj in self.objects.items():
537 match obj:
538 case GitTag():
539 tags[sha] = obj
541 return tags
543 @time_it
544 def _form_lightweight_tags(self) -> dict[str, GitTagLightweight]:
545 """Post-process lightweight tags."""
546 lw_tags = {}
547 for name, tag in self.inspector.tags_info_parsed["lightweight"].items():
548 lw_tags[name] = GitTagLightweight(
549 name=name,
550 anchor=self.objects[tag["anchor"]],
551 )
553 return lw_tags
555 @time_it
556 def _form_objects(self) -> dict[str, GitObject]:
557 """Post-process objects."""
558 git_objects = self.inspector.get_raw_objects()
560 # Commits can heve an empty tree object but it isn't returned by:
561 # git cat-file --batch-all-objects --batch-check="%(objectname) %(objecttype)"
562 # FIXME: maybe it is possible to pass a flag to git cat-file to include it?
563 # Meanwhile I detect it manually.
564 git_empty_tree_object_exists = False
565 for obj in git_objects.values():
566 match obj:
567 case GitCommit():
568 tree_key = cast(str, obj.raw_data["tree"])
569 parent_keys = cast(list[str], obj.raw_data["parents"])
571 if tree_key == GIT_EMPTY_TREE_OBJECT.sha:
572 obj.tree = GIT_EMPTY_TREE_OBJECT
573 git_empty_tree_object_exists = True
574 else:
575 # I prefer for the key-lookup to fail if tree_key is missing
576 obj.tree = cast(GitTree, git_objects[tree_key])
578 try:
579 obj.parents = cast(
580 list[GitCommit], [git_objects[sha] for sha in parent_keys]
581 )
582 except KeyError:
583 # the only way to be here is if the repo is cloned with --depth
584 obj.parents = []
585 case GitTree():
586 obj.children = [
587 cast(GitTree | GitBlob, git_objects[child["sha"]])
588 for child in obj.raw_data
589 ]
590 case GitTag():
591 obj.anchor = git_objects[obj.raw_data["anchor"]]
592 case GitBlob(): 592 ↛ 565line 592 didn't jump to line 565 because the pattern on line 592 always matched
593 pass # no need of post-processing
595 # add the empty tree if it was detected
596 if git_empty_tree_object_exists:
597 git_objects[GIT_EMPTY_TREE_OBJECT.sha] = GIT_EMPTY_TREE_OBJECT
599 for obj in git_objects.values():
600 obj.is_ready = True # type: ignore[method-assign]
602 return git_objects
604 @time_it
605 def _form_stashes(self) -> list[GitStash]:
606 """Post-process stashes."""
607 return [
608 GitStash(
609 index=int(stash["index"]),
610 title=stash["title"],
611 commit=self.commits[stash["sha"]],
612 )
613 for stash in self.inspector.stashes_info_parsed
614 ]
616 @time_it
617 def get_all_reachable_objects(self) -> set[str]:
618 """Return all reachable objects (from all refs and reflog)."""
619 cmd = "--all --reflog --objects --no-object-names"
620 out = self.inspector.git.rev_list(cmd).strip().split("\n")
621 return set() if len(out) == 1 and "" in out else set(out)
623 @time_it
624 def get_objects_reachable_from(
625 self,
626 init_refs: Optional[list[str]],
627 max_numb_commits: Optional[int] = None,
628 ) -> set[str]:
629 """Return SHA of all objects that are reachable from ``init_refs``."""
630 cla = " ".join(init_refs) if init_refs else "--all --reflog"
631 cmd = f"{cla} --objects --no-object-names"
632 if max_numb_commits is not None: 632 ↛ 635line 632 didn't jump to line 635 because the condition on line 632 was always true
633 cmd += f" -n {max_numb_commits}"
635 cmd_output = self.inspector.git.rev_list(cmd).strip().split("\n")
636 return set() if len(cmd_output) == 1 and "" in cmd_output else set(cmd_output)
638 def filter_objects[T: GitObject](self, object_type: Type[T]) -> dict[str, T]:
639 """Filter objects."""
640 return {
641 sha: obj
642 for sha, obj in self.objects.items()
643 if isinstance(obj, object_type)
644 }
646 @time_it
647 def show(self, params: Optional[Params] = None) -> Any:
648 """Show dag."""
650 if params is None: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 params = Params()
653 max_numb_commits = (
654 None
655 if params.public.max_numb_commits < 1
656 else params.public.max_numb_commits
657 )
659 if not params.public.init_refs and max_numb_commits is None:
660 objects_sha_to_include = None
661 else:
662 objects_sha_to_include = self.get_objects_reachable_from(
663 params.public.init_refs,
664 max_numb_commits,
665 )
667 return DagVisualizer(
668 repository=self,
669 params=params,
670 objects_sha_to_include=objects_sha_to_include,
671 in_range_commits=(
672 self.inspector.git.rev_list_range(params.public.range_expr)
673 ),
674 ).show(params.public.xdg_open)
676 def __repr__(self) -> str:
677 local_branches = [b for b in self.branches if b.is_local]
678 remote_branches = [b for b in self.branches if not b.is_local]
680 out = (
681 f"[GitRepository: {self.inspector.repository_path}]\n"
682 f" parsed trees : {self.inspector.parse_trees}\n"
683 f" objects : {len(self.inspector.objects_sha_kind)}\n"
684 f" commits (reachable) : {len(self.inspector.commits_sha['reachable'])}\n"
685 f" commits (unreachable): {len(self.inspector.commits_sha['unreachable'])}\n"
686 f" tags (annotated) : {len(self.tags)}\n"
687 f" tags (lightweight) : {len(self.tags_lw)}\n"
688 f" branches (remote) : {len(remote_branches)}\n"
689 f" branches (local) : {len(local_branches)}"
690 )
691 for branch in local_branches:
692 out += f"\n {branch.name}"
694 out += f"\n HEAD: {self.head}"
695 if self.stashes: 695 ↛ 700line 695 didn't jump to line 700 because the condition on line 695 was always true
696 out += f"\n stashes: {len(self.stashes)}"
697 for stash in self.stashes:
698 out += f"\n stash@{ {stash.index}} : {stash.title[:40]}"
700 return out