Coverage for src/git_dag/git_commands.py: 86%
360 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 12:49 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 12:49 +0200
1"""Git commands related functionality.
3Note
4-----
5The are two kinds of commands: such that simply read data from the repository, and such
6that modify the repository (the latter is used only in unit tests).
8"""
10import logging
11import re
12import shlex
13import subprocess
14import tarfile
15import time
16from pathlib import Path
17from typing import Any, Literal, Optional
19from git_dag.constants import (
20 CMD_TAGS_INFO,
21 COMMIT_DATE,
22 SHA_PATTERN,
23 TAG_FORMAT_FIELDS,
24 DictStrStr,
25)
26from git_dag.exceptions import CalledProcessCustomError
27from git_dag.parameters import Params, ParamsPublic, context_ignore_config_file
28from git_dag.utils import escape_decode, increase_date
30logging.basicConfig(level=logging.WARNING)
31LOG = logging.getLogger(__name__)
34class GitCommandBase:
35 """Base class for git commands."""
37 def __init__(self, path: str | Path = ".") -> None:
38 """Initialize instance."""
39 self.path = path
40 self.command_prefix = f"git -C {path}"
42 def _run(
43 self,
44 command: str,
45 env: Optional[DictStrStr] = None,
46 encoding: str = "utf-8",
47 ) -> str:
48 """Run a git command."""
49 try:
50 return subprocess.run(
51 shlex.split(f"{self.command_prefix} {command}"),
52 stdout=subprocess.PIPE,
53 stderr=subprocess.PIPE,
54 check=True,
55 env=env,
56 ).stdout.decode(encoding, errors="replace")
57 except subprocess.CalledProcessError as e:
58 raise CalledProcessCustomError(e) from e
60 @staticmethod
61 def run_general(
62 command: str,
63 env: Optional[DictStrStr] = None,
64 encoding: str = "utf-8",
65 expected_stderr: Optional[str] = None,
66 ) -> str:
67 """Run a general command."""
68 with subprocess.Popen(
69 command,
70 shell=True,
71 stdout=subprocess.PIPE,
72 stderr=subprocess.PIPE,
73 env=env,
74 ) as process:
75 output, error = process.communicate()
76 # some git commands output messages to stderr even when there is no error
77 if error:
78 if expected_stderr is None:
79 raise RuntimeError(error)
80 if not expected_stderr in error.decode("utf-8"):
81 raise RuntimeError(error)
82 return output.decode(encoding, errors="replace").strip()
85class GitCommandMutate(GitCommandBase):
86 """Git commands that create/modify a repository.
88 Warning
89 --------
90 The functionality in this class is rudimentary and is used only to create a
91 repository for the tests.
93 """
95 def __init__(
96 self,
97 path: str | Path = ".", # assumed to exist
98 author: str = "First Last <first.last@mail.com>",
99 committer: str = "Nom Prenom <nom.prenom@mail.com>",
100 date: Optional[str] = None,
101 evolving_date: bool = False,
102 ) -> None:
103 """Initialize instance."""
104 self.author = author
105 self.committer = committer
106 self.date = date
107 self.evolving_date = evolving_date
109 super().__init__(path)
111 def init(self, branch: str = "main", bare: bool = False) -> None:
112 """Initialise a git repository."""
113 self._run(f"init -b {branch} {'--bare' if bare else ''}")
115 @property
116 def env(self) -> DictStrStr:
117 """Return environment with author and committer to pass to commands."""
118 env = {}
119 match = re.search("(?P<name>.*) (?P<email><.*>)", self.author)
120 if match:
121 env["GIT_AUTHOR_NAME"] = match.group("name")
122 env["GIT_AUTHOR_EMAIL"] = match.group("email")
123 else:
124 raise ValueError("Author not matched.") # pragma: no cover
126 match = re.search("(?P<name>.*) (?P<email><.*>)", self.committer)
127 if match:
128 env["GIT_COMMITTER_NAME"] = match.group("name")
129 env["GIT_COMMITTER_EMAIL"] = match.group("email")
130 else:
131 raise ValueError("Committer not matched.") # pragma: no cover
133 if self.date is not None: 133 ↛ 139line 133 didn't jump to line 139 because the condition on line 133 was always true
134 env["GIT_AUTHOR_DATE"] = self.date
135 env["GIT_COMMITTER_DATE"] = self.date
137 self.date = increase_date(self.date) if self.evolving_date else self.date
139 return env
141 def add(self, files: DictStrStr) -> None:
142 """Add files to the index.
144 ``files`` specifies files to be added to the index (its format is ``{'filename':
145 'file contents', ...}``). Names of files should not include the path to the
146 repository (it is prepended).
148 """
149 for filename, contents in files.items():
150 with open(Path(self.path) / filename, "w", encoding="utf-8") as h:
151 h.write(contents)
152 self._run(f"add {filename}")
154 def cm(
155 self,
156 messages: str | list[str],
157 files: Optional[DictStrStr] = None,
158 author_info: Optional[DictStrStr] = None,
159 ) -> None:
160 """Add commit(s).
162 If ``files`` is not specified an empty commit is created.
164 Note
165 -----
166 When ``messages`` is a list, multiple empty commits are created (``files``
167 cannot be specified).
169 """
171 def update_author_info(env: DictStrStr) -> DictStrStr:
172 if author_info is not None: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 env["GIT_AUTHOR_NAME"] = env["GIT_COMMITTER_NAME"] = author_info["name"]
174 env["GIT_AUTHOR_EMAIL"] = env["GIT_COMMITTER_EMAIL"] = author_info[
175 "mail"
176 ]
177 if "date" in author_info:
178 env["GIT_AUTHOR_DATE"] = env["GIT_COMMITTER_DATE"] = author_info[
179 "date"
180 ]
181 return env
183 if isinstance(messages, str):
184 if files is not None:
185 self.add(files)
187 self._run(
188 f'commit --allow-empty -m "{messages}"',
189 env=update_author_info(self.env),
190 )
191 elif isinstance(messages, (list, tuple)):
192 if files is not None:
193 raise ValueError("Cannot add files with multiple commits.")
194 for msg in messages:
195 self._run(
196 f'commit --allow-empty -m "{msg}"',
197 env=update_author_info(self.env),
198 )
199 else:
200 raise ValueError("Unsupported message type.")
202 def br(
203 self,
204 branch: str,
205 create: bool = False,
206 orphan: bool = False,
207 delete: bool = False,
208 ) -> None:
209 """Create/switch/delete branch."""
210 if create and delete:
211 raise ValueError("At most one of create and delete can be True.")
213 if delete:
214 self._run(f"branch -D {branch}")
215 else:
216 create_switch = ""
217 if create:
218 create_switch = "--orphan" if orphan else "-c"
220 self._run(f"switch {create_switch} {branch}")
222 def mg(
223 self,
224 branch: str,
225 message: str = "m",
226 strategy: str = "theirs",
227 unrelated: bool = False,
228 ) -> None:
229 """Merge."""
230 if unrelated: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 flags = "--allow-unrelated-histories"
232 else:
233 flags = f"-X {strategy}"
234 self._run(f'merge {flags} {branch} -m "{message}"', env=self.env)
236 def mg_multiple(self, branches: list[str], message: str = "m") -> None:
237 """Merge multiple (possibly orphan) branches without conflicts."""
238 try:
239 self._run(
240 f'merge --allow-unrelated-histories {' '.join(branches)} -m "{message}"',
241 env=self.env,
242 )
243 except CalledProcessCustomError as e:
244 if (
245 "Automatic merge failed; fix conflicts and then commit the result"
246 in e.output.decode("utf-8")
247 ):
248 self.cm(message)
249 else:
250 raise
252 def rebase(self, branch: str) -> None:
253 """Rebase the current branch on the given branch (assuming no conflicts)."""
254 self._run(f"rebase {branch}", env=self.env)
256 def stash(
257 self,
258 files: DictStrStr,
259 title: Optional[str] = None,
260 sleep: bool = True,
261 ) -> None:
262 """Stash.
264 Note
265 -----
266 ``files`` specifies files to be modified before we stash (its format is
267 ``{'filename': 'file contents', ...}``. At least one file should be modified in
268 order for ``git stash`` to be meaningful.
270 Warning
271 --------
272 At the end of this method we sleep for 1 second otherwise stashes created very
273 fast one after another might share the "index commit" (or might not, depending
274 on delay). See https://github.com/drdv/git-dag/issues/84.
276 """
277 for filename, contents in files.items():
278 with open(Path(self.path) / filename, "w", encoding="utf-8") as h:
279 h.write(contents)
281 if title is None:
282 self._run("stash", env=self.env)
283 else:
284 self._run(f'stash push -m "{title}"', env=self.env)
286 if sleep: 286 ↛ exitline 286 didn't return from function 'stash' because the condition on line 286 was always true
287 time.sleep(1) # see Warning in docstring
289 def tag(
290 self,
291 name: str,
292 message: Optional[str] = None,
293 ref: Optional[str] = None,
294 delete: bool = False,
295 ) -> None:
296 """Create/delete annotated or lightweight tag.
298 Note
299 -----
300 When a message is specified, an annotated tag is created.
302 """
303 if message is not None and delete:
304 raise ValueError("When delete is True, message should be None.")
306 if delete:
307 self._run(f"tag -d {name}")
308 else:
309 ref_str = ref if ref is not None else ""
310 message_str = f'-m "{message}"' if message is not None else ""
311 self._run(f"tag {name} {ref_str} {message_str}", env=self.env)
313 def note(self, msg: str, ref: Optional[str] = None) -> None:
314 """Add a git note to a given ref (e.g., hash, branch name)."""
315 self._run(
316 f'notes add -m "{msg}" {ref if ref is not None else ""}',
317 env=self.env,
318 )
320 def config(self, option: str) -> None:
321 """Set a gonfig option."""
322 self._run(f"config {option}")
324 def push(self) -> None:
325 """Push."""
326 self._run("push")
328 def pull(self) -> None:
329 """Push."""
330 self._run("pull")
332 def fetch(self) -> None:
333 """Push."""
334 self._run("fetch")
336 @classmethod
337 def clone_from_local(
338 cls,
339 src_dir: Path | str,
340 target_dir: Path | str,
341 depth: Optional[int] = None,
342 ) -> None:
343 """Clone a local repository with ``--depth 1`` flag.
345 Note
346 -----
347 This command doesn't mutate a repository but appears under
348 :class:`GitCommandMutate` as it is meant to be used only in the unit tests and
349 docs examples.
351 """
352 # note that git clone sends to stderr (so I suppress it using -q)
353 depth_arg = f"--depth {depth}" if depth is not None else ""
354 cls.run_general(
355 f"git clone -q {depth_arg} file://{src_dir} {target_dir}",
356 expected_stderr="You appear to have cloned an empty repository",
357 )
359 def init_remote_head(self, remote: str = "origin", branch: str = "main") -> None:
360 """Init remote HEAD.
362 Note
363 -----
364 When we clone an empty repo the remote HEAD is not initialized (we can use this
365 method to do it manually). If we clone a repo with at least one commit on it
366 (and a reasonable setup), then the remote HEAD would be initialized upon
367 cloning.
369 """
370 self.run_general(
371 f"{self.command_prefix} symbolic-ref "
372 f"refs/remotes/{remote}/HEAD "
373 f"refs/remotes/{remote}/{branch}"
374 )
377class GitCommand(GitCommandBase):
378 """Git commands that query the repository to process (without modifications)."""
380 def get_objects_sha_kind(self) -> list[str]:
381 """Return the SHA and type of all git objects (in one string).
383 Note
384 -----
385 Unreachable commits (and deleted annotated tags) are included as well.
387 Note
388 -----
389 The ``--unordered`` flag is used because ordering by SHA is not necessary.
391 """
392 CMD = (
393 "cat-file --batch-all-objects --unordered "
394 '--batch-check="%(objectname) %(objecttype)"'
395 )
396 objects = self._run(CMD).strip().split("\n")
398 if len(objects) == 1 and not objects[0]:
399 LOG.warning("No objects")
400 return []
402 return objects
404 def read_object_file(self, sha: str) -> list[str]:
405 """Read the file associated with an object.
407 Note
408 -----
409 It is quite slow if all objects are to be read like this (``-p`` stands for
410 pretty-print).
412 """
413 return self._run(f"cat-file -p {sha}").strip().split("\n")
415 def get_remotes(self) -> list[str]:
416 """Return list of remotes."""
417 cmd_output = self._run("remote").strip().split("\n")
418 if len(cmd_output) == 1 and "" in cmd_output:
419 return []
420 return cmd_output
422 def get_fsck_unreachable_commits(self) -> list[str]:
423 """Return unreachable commits not in the reflog."""
424 cmd_output = (
425 self.run_general(
426 f"{self.command_prefix} fsck --unreachable --no-reflog 2>/dev/null | "
427 "grep commit | cut -d' ' -f3"
428 )
429 .strip()
430 .split("\n")
431 )
433 return [] if len(cmd_output) == 1 and "" in cmd_output else cmd_output
435 def get_remote_heads_sym_ref(self, remotes: list[str]) -> DictStrStr:
436 """Return symbolic references of remote heads."""
437 symb_refs = {}
438 for remote in remotes:
439 cmd = f"symbolic-ref refs/remotes/{remote}/HEAD"
440 try:
441 cmd_output = self._run(cmd).strip().split("\n")
442 # drop refs/remotes
443 symb_refs[f"{remote}/HEAD"] = "/".join(cmd_output[0].split("/")[2:])
444 except CalledProcessCustomError:
445 LOG.warning(f"HEAD not defined for {remote}.")
446 return symb_refs
448 def get_prs_heads(self) -> DictStrStr:
449 """Return heads of pull-requests."""
450 try:
451 cmd_output = self._run("ls-remote").strip().split("\n")
452 except CalledProcessCustomError as e:
453 LOG.warning(e)
454 return {}
456 out = {}
457 for line in cmd_output:
458 match = re.search(f"{SHA_PATTERN}\trefs/pull/(?P<pr_id>\\d+)/head", line)
459 if match:
460 out[match.group("pr_id")] = match.group("sha")
462 return out
464 def get_branches(self, remotes: list[str]) -> dict[str, DictStrStr]:
465 """Get local/remote branches (while excluding remote HEADs)."""
466 refs: dict[str, DictStrStr] = {"local": {}, "remote": {}}
468 try:
469 cmd_output = self._run("show-ref").strip().split("\n")
470 except CalledProcessCustomError:
471 LOG.warning("No refs")
472 return refs
474 for ref in cmd_output:
475 sha, name = ref.split()
476 if "refs/heads" in ref:
477 refs["local"]["/".join(name.split("/")[2:])] = sha
479 if "refs/remotes" in ref:
480 # skip remote HEADs (handled in GitCommand.get_remote_heads_sym_ref)
481 if name not in [f"refs/remotes/{remote}/HEAD" for remote in remotes]:
482 refs["remote"]["/".join(name.split("/")[2:])] = sha
484 return refs
486 def get_local_head_commit_sha(self) -> str:
487 """Return SHA of the commit pointed to by local HEAD."""
488 return self._run("rev-parse HEAD").strip()
490 def rev_parse_descriptors(
491 self, descriptors: Optional[list[str]]
492 ) -> Optional[list[str]]:
493 """Return a set of SHA corresponding to a list of descriptors.
495 Note
496 -----
497 A descriptor can be e.g., HEAD, main, a truncated SHA, etc.
499 """
500 if descriptors is None: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 return None
503 args = " ".join([f"'{descriptor}'" for descriptor in descriptors])
504 try:
505 return self._run(f"rev-parse {args}").strip().split("\n")
506 except CalledProcessCustomError as e:
507 LOG.warning(e)
509 return None
511 def rev_list_range(self, range_expr: Optional[str]) -> Optional[list[str]]:
512 """Return set of commit SHA in the range defined by ``range_expr``.
514 Note
515 -----
516 For example ``range_expr`` could be ``main..feature``.
518 """
519 if range_expr is None: 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true
520 return None
522 try:
523 out = self._run(f"rev-list {range_expr}").strip().split("\n")
524 return None if len(out) == 1 and not out[0] else out
525 except CalledProcessCustomError as e:
526 LOG.warning(e)
528 return None
530 def get_local_head_branch(self) -> Optional[str]:
531 """Return name of branch pointed to by HEAD."""
532 branch_name = self._run("branch --show-current").strip()
533 return branch_name if branch_name else None
535 def local_branch_is_tracking(self, local_branch_sha: str) -> Optional[str]:
536 """Detect if a local branch is tracking a remote one."""
537 try:
538 cmd = f"rev-parse --symbolic-full-name {local_branch_sha}@{ upstream} "
539 return self._run(cmd).strip()
540 except CalledProcessCustomError:
541 return None
543 def get_stash_info(self) -> Optional[list[str]]:
544 """Return stash IDs and their associated SHAs."""
545 try:
546 if not self._run("stash list").strip():
547 return None
548 except CalledProcessCustomError as e:
549 expected_error = "fatal: this operation must be run in a work tree"
550 if expected_error in e.stderr.decode("utf-8"):
551 return [] # we are in a bare repository
552 raise
554 cmd = "reflog stash --no-abbrev --format='%H %gD %gs'"
555 return self._run(cmd).strip().split("\n")
557 def rev_list(self, args: str) -> str:
558 """Return output of ``git-rev-list``.
560 Note
561 -----
562 The ``--all`` flag doesn't imply all commits but all commits reachable from
563 any reference.
565 """
566 return self._run(f"rev-list {args}")
568 def ls_tree(self, sha: str) -> list[str]:
569 """Return children of a tree object.
571 Note
572 -----
573 The default output of ``git ls-tree SHA`` is the same as
574 ``git cat-file -p SHA``. Maybe I should use the ``--object-only`` flag.
576 """
577 return self._run(f"ls-tree {sha}").strip().split("\n")
579 def get_blobs_and_trees_names(self, trees_info: dict[str, list[str]]) -> DictStrStr:
580 """Return actual names of blobs and trees.
582 Note
583 -----
584 Based on https://stackoverflow.com/a/25954360.
586 Note
587 -----
588 A tree object might have no name -- this happens when a repository has no
589 directories (note that a commit always has an associated tree object) or when a
590 tree object is created manually (without a commit). Sometimes a blob has no
591 name, e.g., when it are created manually (``git hash-object -w``) or it is not
592 referenced by a tree object.
594 """
595 cmd_out = (
596 self.run_general(
597 f"{self.command_prefix} rev-list --objects --reflog --all | "
598 f"{self.command_prefix} cat-file "
599 "--batch-check='%(objectname) %(objecttype) %(rest)' | "
600 r"grep '^[^ ]* blob\|tree' | "
601 "cut -d' ' -f1,3"
602 )
603 .strip()
604 .split("\n")
605 )
607 sha_name = {}
608 for blob_or_tree in cmd_out:
609 components = blob_or_tree.split()
610 if len(components) == 2:
611 sha_name[components[0]] = components[1]
613 # may add names of standalone trees/objects
614 for tree_info in trees_info.values():
615 for tree_or_blob in tree_info:
616 if tree_or_blob: # protect against the empty tree object
617 sha, name = tree_or_blob.split(" ")[-1].split("\t")
618 sha_name[sha] = name
620 return sha_name
622 def get_tags_info_parsed(self) -> dict[str, dict[str, DictStrStr]]:
623 """Return parsed info for all annotated and lightweight tags.
625 Note
626 -----
627 The ``git for-each-ref ...`` command (see
628 :obj:`~git_dag.constants.CMD_TAGS_INFO`) used in this function doesn't return
629 deleted annotated tags. They are handled separately in
630 :func:`GitInspector._get_objects_info_parsed` (note that their SHA is included
631 in the output of :func:`GitCommand.get_objects_sha_kind`).
633 Note
634 -----
635 The ``--python`` flag (see :obj:`~git_dag.constants.CMD_TAGS_INFO`) forms
636 groups delimited by ``'...'`` which makes them easy to split and parse. On the
637 flip-side, we have to decode escapes of escapes while preserving unicode
638 characters. Note that if the message contains ``\\n``-s (i.e., one backlash),
639 they would appear as ``\\\\\\\\n`` (four backlashes).
641 """
642 tags: dict[str, dict[str, DictStrStr]] = {"annotated": {}, "lightweight": {}}
643 for raw_tag in [
644 dict(zip(TAG_FORMAT_FIELDS, re.findall(r"'((?:[^'\\]|\\.)*)'", t)))
645 # splitlines() cannot be used here because it splits on CRLF characters
646 for t in self._run(CMD_TAGS_INFO).strip().split("\n")
647 if t # when there are no tags "".split("\n") results in [""]
648 ]:
649 if raw_tag["object"]:
650 raw_tag["anchor"] = raw_tag.pop("object")
651 raw_tag["message"] = escape_decode(raw_tag["contents"])
652 tags["annotated"][raw_tag.pop("sha")] = raw_tag # indexed by SHA
653 else:
654 raw_tag["anchor"] = raw_tag.pop("sha")
655 tags["lightweight"][raw_tag.pop("refname")] = raw_tag # indexed by name
657 return tags
659 def get_notes_dag_root(self) -> Optional[DictStrStr]:
660 """Return the root node of the DAG for git notes."""
661 notes_ref = self._run("notes get-ref").strip().split("\n")[0]
662 try:
663 notes_dag_root = self._run(f"rev-list {notes_ref}").strip().split("\n")[0]
664 except CalledProcessCustomError:
665 return None # there are no git notes
666 return {"ref": notes_ref, "root": notes_dag_root}
669class TestGitRepository:
670 """Create test git repository."""
672 @classmethod
673 def create(
674 cls,
675 label: Literal["default", "default-with-notes", "empty"],
676 repo_path: Path | str, # assumed to exist
677 tar_file_name: Optional[Path | str] = None,
678 **kwargs: dict[str, Any],
679 ) -> GitCommandMutate:
680 """Git repository creation displatch."""
681 match label:
682 case "default":
683 git = cls.repository_default(repo_path)
684 case "default-with-notes":
685 git = cls.repository_default_with_notes(repo_path)
686 case "empty":
687 git = cls.repository_empty(repo_path, **kwargs)
688 case _:
689 raise ValueError(f"Unknown repository label: {label}")
691 if tar_file_name is not None:
692 cls.tar(repo_path, tar_file_name)
694 return git
696 @staticmethod
697 def tar(src_path: Path | str, tar_file_name: Path | str) -> None:
698 """Tar a git repository."""
699 with tarfile.open(tar_file_name, "w:gz") as h:
700 h.add(src_path, arcname=".")
702 @staticmethod
703 def untar(tar_file_name: Path | str, extract_path: Path | str) -> None:
704 """Untar a git repository."""
705 with tarfile.open(tar_file_name, "r:gz") as tar:
706 tar.extractall(path=extract_path, filter="fully_trusted")
708 @staticmethod
709 def repository_empty(
710 path: Path | str,
711 files: Optional[DictStrStr] = None,
712 ) -> GitCommandMutate:
713 """Empty repository (possibly with files added to the index)."""
714 git = GitCommandMutate(path)
715 git.init()
717 if files is not None:
718 git.add(files)
720 return git
722 @staticmethod
723 def repository_default(path: Path | str) -> GitCommandMutate:
724 """Default repository."""
725 git = GitCommandMutate(path, date=COMMIT_DATE)
726 git.init()
727 git.cm("A\n\nBody:\n * First line\n * Second line\n * Third line")
728 git.br("topic", create=True)
729 git.cm("D")
730 git.br("feature", create=True)
731 git.cm("F")
732 git.cm("G", files={"file": "G"})
733 git.br("topic")
734 git.cm("E", files={"file": "E"})
735 git.mg("feature")
736 git.tag("0.1", "Summary\n\nBody:\n * First line\n * Second line\n * Third line")
737 git.tag("0.2", "Summary\n\nBody:\n * First line\n * Second line\n * Third line")
738 git.cm("H")
739 git.br("main")
740 git.cm(["B", "C"])
741 git.tag("0.7", "tag 0.7")
742 git.tag("0.7r", "ref to tag 0.7", ref="0.7")
743 git.tag("0.7rr", "ref to ref to tag 0.7", ref="0.7r")
744 git.br("feature", delete=True)
745 git.br("topic")
746 git.tag("0.3", "T1")
747 git.tag("0.4")
748 git.tag("0.5")
749 git.tag("0.1", delete=True)
750 git.tag("0.4", delete=True)
751 git.br("bugfix", create=True)
752 git.cm("I")
753 git.tag(
754 # pylint: disable=invalid-character-sub
755 "0.6",
756 "Test: €.",
757 )
758 git.cm("J")
759 git.br("topic")
760 git.br("bugfix", delete=True)
761 git.stash({"file": "stash:first"})
762 git.stash({"file": "stash:second"}, title="second")
763 git.stash({"file": "stash:third"}, title="third")
765 # add two standalone blobs and a standalone tree
766 prefix = git.command_prefix
767 git.run_general(f"echo 'test content 1' | {prefix} hash-object -w --stdin")
768 git.run_general(f"echo 'test content 2' | {prefix} hash-object -w --stdin")
770 sha = "74689c87fb53b6d666de95efea667d99ba2fa52a"
771 git.run_general(f"{prefix} update-index --add --cacheinfo 100644 {sha} tmp.txt")
772 git.run_general(f"{prefix} write-tree")
774 git.config("gc.auto 0")
776 return git
778 @classmethod
779 def repository_default_with_notes(cls, path: Path | str) -> GitCommandMutate:
780 """Default repository with git notes."""
781 git = cls.repository_default(path)
783 git.note("Add a note")
784 git.note("Add a another note", "main")
786 return git
789def create_test_repo_and_reference_dot_file(
790 path: Path | str = "test/resources/default_repo",
791) -> None:
792 """Create a git repository and its associated DOT file (to use as reference).
794 Note
795 -----
796 This is meant to be used only when the test resources should be changed. To execute:
797 ``cd src/git_dag && python git_commands.py``.
799 """
800 # pylint: disable=import-outside-toplevel
801 import shutil
803 from git_dag.git_repository import GitRepository # pylint: disable=cyclic-import
805 path = Path(path)
806 path.mkdir()
807 TestGitRepository.create("default", path)
809 repo = GitRepository(path, parse_trees=True)
810 with context_ignore_config_file():
811 params = Params(
812 public=ParamsPublic(
813 show_unreachable_commits=True,
814 show_local_branches=True,
815 show_remote_branches=True,
816 show_trees=True,
817 show_trees_standalone=True,
818 show_blobs=True,
819 show_blobs_standalone=True,
820 show_tags=True,
821 show_deleted_tags=True,
822 show_stash=True,
823 show_head=True,
824 max_numb_commits=0,
825 annotations=[
826 ["4499ee63", "just a tooltip"],
827 ["0.3", "additional info"],
828 ["0.5"], # this will not be displayed
829 ["HEAD"],
830 ["main^", "a clarification"],
831 ],
832 format="gv",
833 file=path / "../default_repo.gv",
834 )
835 )
836 repo.show(params)
838 # TestGitRepository.tar(path, path / "../default_repo.tar.gz")
840 with open(path / "../default_repo.repr", "w", encoding="utf-8") as h:
841 h.write(repr(repo))
843 shutil.rmtree(path)
846if __name__ == "__main__": # pragma: no cover
847 create_test_repo_and_reference_dot_file()