Coverage for src/git_dag/git_commands.py: 86%

360 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 12:49 +0200

1"""Git commands related functionality. 

2 

3Note 

4----- 

5The are two kinds of commands: such that simply read data from the repository, and such 

6that modify the repository (the latter is used only in unit tests). 

7 

8""" 

9 

10import logging 

11import re 

12import shlex 

13import subprocess 

14import tarfile 

15import time 

16from pathlib import Path 

17from typing import Any, Literal, Optional 

18 

19from git_dag.constants import ( 

20 CMD_TAGS_INFO, 

21 COMMIT_DATE, 

22 SHA_PATTERN, 

23 TAG_FORMAT_FIELDS, 

24 DictStrStr, 

25) 

26from git_dag.exceptions import CalledProcessCustomError 

27from git_dag.parameters import Params, ParamsPublic, context_ignore_config_file 

28from git_dag.utils import escape_decode, increase_date 

29 

30logging.basicConfig(level=logging.WARNING) 

31LOG = logging.getLogger(__name__) 

32 

33 

34class GitCommandBase: 

35 """Base class for git commands.""" 

36 

37 def __init__(self, path: str | Path = ".") -> None: 

38 """Initialize instance.""" 

39 self.path = path 

40 self.command_prefix = f"git -C {path}" 

41 

42 def _run( 

43 self, 

44 command: str, 

45 env: Optional[DictStrStr] = None, 

46 encoding: str = "utf-8", 

47 ) -> str: 

48 """Run a git command.""" 

49 try: 

50 return subprocess.run( 

51 shlex.split(f"{self.command_prefix} {command}"), 

52 stdout=subprocess.PIPE, 

53 stderr=subprocess.PIPE, 

54 check=True, 

55 env=env, 

56 ).stdout.decode(encoding, errors="replace") 

57 except subprocess.CalledProcessError as e: 

58 raise CalledProcessCustomError(e) from e 

59 

60 @staticmethod 

61 def run_general( 

62 command: str, 

63 env: Optional[DictStrStr] = None, 

64 encoding: str = "utf-8", 

65 expected_stderr: Optional[str] = None, 

66 ) -> str: 

67 """Run a general command.""" 

68 with subprocess.Popen( 

69 command, 

70 shell=True, 

71 stdout=subprocess.PIPE, 

72 stderr=subprocess.PIPE, 

73 env=env, 

74 ) as process: 

75 output, error = process.communicate() 

76 # some git commands output messages to stderr even when there is no error 

77 if error: 

78 if expected_stderr is None: 

79 raise RuntimeError(error) 

80 if not expected_stderr in error.decode("utf-8"): 

81 raise RuntimeError(error) 

82 return output.decode(encoding, errors="replace").strip() 

83 

84 

85class GitCommandMutate(GitCommandBase): 

86 """Git commands that create/modify a repository. 

87 

88 Warning 

89 -------- 

90 The functionality in this class is rudimentary and is used only to create a 

91 repository for the tests. 

92 

93 """ 

94 

95 def __init__( 

96 self, 

97 path: str | Path = ".", # assumed to exist 

98 author: str = "First Last <first.last@mail.com>", 

99 committer: str = "Nom Prenom <nom.prenom@mail.com>", 

100 date: Optional[str] = None, 

101 evolving_date: bool = False, 

102 ) -> None: 

103 """Initialize instance.""" 

104 self.author = author 

105 self.committer = committer 

106 self.date = date 

107 self.evolving_date = evolving_date 

108 

109 super().__init__(path) 

110 

111 def init(self, branch: str = "main", bare: bool = False) -> None: 

112 """Initialise a git repository.""" 

113 self._run(f"init -b {branch} {'--bare' if bare else ''}") 

114 

115 @property 

116 def env(self) -> DictStrStr: 

117 """Return environment with author and committer to pass to commands.""" 

118 env = {} 

119 match = re.search("(?P<name>.*) (?P<email><.*>)", self.author) 

120 if match: 

121 env["GIT_AUTHOR_NAME"] = match.group("name") 

122 env["GIT_AUTHOR_EMAIL"] = match.group("email") 

123 else: 

124 raise ValueError("Author not matched.") # pragma: no cover 

125 

126 match = re.search("(?P<name>.*) (?P<email><.*>)", self.committer) 

127 if match: 

128 env["GIT_COMMITTER_NAME"] = match.group("name") 

129 env["GIT_COMMITTER_EMAIL"] = match.group("email") 

130 else: 

131 raise ValueError("Committer not matched.") # pragma: no cover 

132 

133 if self.date is not None: 133 ↛ 139line 133 didn't jump to line 139 because the condition on line 133 was always true

134 env["GIT_AUTHOR_DATE"] = self.date 

135 env["GIT_COMMITTER_DATE"] = self.date 

136 

137 self.date = increase_date(self.date) if self.evolving_date else self.date 

138 

139 return env 

140 

141 def add(self, files: DictStrStr) -> None: 

142 """Add files to the index. 

143 

144 ``files`` specifies files to be added to the index (its format is ``{'filename': 

145 'file contents', ...}``). Names of files should not include the path to the 

146 repository (it is prepended). 

147 

148 """ 

149 for filename, contents in files.items(): 

150 with open(Path(self.path) / filename, "w", encoding="utf-8") as h: 

151 h.write(contents) 

152 self._run(f"add {filename}") 

153 

154 def cm( 

155 self, 

156 messages: str | list[str], 

157 files: Optional[DictStrStr] = None, 

158 author_info: Optional[DictStrStr] = None, 

159 ) -> None: 

160 """Add commit(s). 

161 

162 If ``files`` is not specified an empty commit is created. 

163 

164 Note 

165 ----- 

166 When ``messages`` is a list, multiple empty commits are created (``files`` 

167 cannot be specified). 

168 

169 """ 

170 

171 def update_author_info(env: DictStrStr) -> DictStrStr: 

172 if author_info is not None: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 env["GIT_AUTHOR_NAME"] = env["GIT_COMMITTER_NAME"] = author_info["name"] 

174 env["GIT_AUTHOR_EMAIL"] = env["GIT_COMMITTER_EMAIL"] = author_info[ 

175 "mail" 

176 ] 

177 if "date" in author_info: 

178 env["GIT_AUTHOR_DATE"] = env["GIT_COMMITTER_DATE"] = author_info[ 

179 "date" 

180 ] 

181 return env 

182 

183 if isinstance(messages, str): 

184 if files is not None: 

185 self.add(files) 

186 

187 self._run( 

188 f'commit --allow-empty -m "{messages}"', 

189 env=update_author_info(self.env), 

190 ) 

191 elif isinstance(messages, (list, tuple)): 

192 if files is not None: 

193 raise ValueError("Cannot add files with multiple commits.") 

194 for msg in messages: 

195 self._run( 

196 f'commit --allow-empty -m "{msg}"', 

197 env=update_author_info(self.env), 

198 ) 

199 else: 

200 raise ValueError("Unsupported message type.") 

201 

202 def br( 

203 self, 

204 branch: str, 

205 create: bool = False, 

206 orphan: bool = False, 

207 delete: bool = False, 

208 ) -> None: 

209 """Create/switch/delete branch.""" 

210 if create and delete: 

211 raise ValueError("At most one of create and delete can be True.") 

212 

213 if delete: 

214 self._run(f"branch -D {branch}") 

215 else: 

216 create_switch = "" 

217 if create: 

218 create_switch = "--orphan" if orphan else "-c" 

219 

220 self._run(f"switch {create_switch} {branch}") 

221 

222 def mg( 

223 self, 

224 branch: str, 

225 message: str = "m", 

226 strategy: str = "theirs", 

227 unrelated: bool = False, 

228 ) -> None: 

229 """Merge.""" 

230 if unrelated: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 flags = "--allow-unrelated-histories" 

232 else: 

233 flags = f"-X {strategy}" 

234 self._run(f'merge {flags} {branch} -m "{message}"', env=self.env) 

235 

236 def mg_multiple(self, branches: list[str], message: str = "m") -> None: 

237 """Merge multiple (possibly orphan) branches without conflicts.""" 

238 try: 

239 self._run( 

240 f'merge --allow-unrelated-histories {' '.join(branches)} -m "{message}"', 

241 env=self.env, 

242 ) 

243 except CalledProcessCustomError as e: 

244 if ( 

245 "Automatic merge failed; fix conflicts and then commit the result" 

246 in e.output.decode("utf-8") 

247 ): 

248 self.cm(message) 

249 else: 

250 raise 

251 

252 def rebase(self, branch: str) -> None: 

253 """Rebase the current branch on the given branch (assuming no conflicts).""" 

254 self._run(f"rebase {branch}", env=self.env) 

255 

256 def stash( 

257 self, 

258 files: DictStrStr, 

259 title: Optional[str] = None, 

260 sleep: bool = True, 

261 ) -> None: 

262 """Stash. 

263 

264 Note 

265 ----- 

266 ``files`` specifies files to be modified before we stash (its format is 

267 ``{'filename': 'file contents', ...}``. At least one file should be modified in 

268 order for ``git stash`` to be meaningful. 

269 

270 Warning 

271 -------- 

272 At the end of this method we sleep for 1 second otherwise stashes created very 

273 fast one after another might share the "index commit" (or might not, depending 

274 on delay). See https://github.com/drdv/git-dag/issues/84. 

275 

276 """ 

277 for filename, contents in files.items(): 

278 with open(Path(self.path) / filename, "w", encoding="utf-8") as h: 

279 h.write(contents) 

280 

281 if title is None: 

282 self._run("stash", env=self.env) 

283 else: 

284 self._run(f'stash push -m "{title}"', env=self.env) 

285 

286 if sleep: 286 ↛ exitline 286 didn't return from function 'stash' because the condition on line 286 was always true

287 time.sleep(1) # see Warning in docstring 

288 

289 def tag( 

290 self, 

291 name: str, 

292 message: Optional[str] = None, 

293 ref: Optional[str] = None, 

294 delete: bool = False, 

295 ) -> None: 

296 """Create/delete annotated or lightweight tag. 

297 

298 Note 

299 ----- 

300 When a message is specified, an annotated tag is created. 

301 

302 """ 

303 if message is not None and delete: 

304 raise ValueError("When delete is True, message should be None.") 

305 

306 if delete: 

307 self._run(f"tag -d {name}") 

308 else: 

309 ref_str = ref if ref is not None else "" 

310 message_str = f'-m "{message}"' if message is not None else "" 

311 self._run(f"tag {name} {ref_str} {message_str}", env=self.env) 

312 

313 def note(self, msg: str, ref: Optional[str] = None) -> None: 

314 """Add a git note to a given ref (e.g., hash, branch name).""" 

315 self._run( 

316 f'notes add -m "{msg}" {ref if ref is not None else ""}', 

317 env=self.env, 

318 ) 

319 

320 def config(self, option: str) -> None: 

321 """Set a gonfig option.""" 

322 self._run(f"config {option}") 

323 

324 def push(self) -> None: 

325 """Push.""" 

326 self._run("push") 

327 

328 def pull(self) -> None: 

329 """Push.""" 

330 self._run("pull") 

331 

332 def fetch(self) -> None: 

333 """Push.""" 

334 self._run("fetch") 

335 

336 @classmethod 

337 def clone_from_local( 

338 cls, 

339 src_dir: Path | str, 

340 target_dir: Path | str, 

341 depth: Optional[int] = None, 

342 ) -> None: 

343 """Clone a local repository with ``--depth 1`` flag. 

344 

345 Note 

346 ----- 

347 This command doesn't mutate a repository but appears under 

348 :class:`GitCommandMutate` as it is meant to be used only in the unit tests and 

349 docs examples. 

350 

351 """ 

352 # note that git clone sends to stderr (so I suppress it using -q) 

353 depth_arg = f"--depth {depth}" if depth is not None else "" 

354 cls.run_general( 

355 f"git clone -q {depth_arg} file://{src_dir} {target_dir}", 

356 expected_stderr="You appear to have cloned an empty repository", 

357 ) 

358 

359 def init_remote_head(self, remote: str = "origin", branch: str = "main") -> None: 

360 """Init remote HEAD. 

361 

362 Note 

363 ----- 

364 When we clone an empty repo the remote HEAD is not initialized (we can use this 

365 method to do it manually). If we clone a repo with at least one commit on it 

366 (and a reasonable setup), then the remote HEAD would be initialized upon 

367 cloning. 

368 

369 """ 

370 self.run_general( 

371 f"{self.command_prefix} symbolic-ref " 

372 f"refs/remotes/{remote}/HEAD " 

373 f"refs/remotes/{remote}/{branch}" 

374 ) 

375 

376 

377class GitCommand(GitCommandBase): 

378 """Git commands that query the repository to process (without modifications).""" 

379 

380 def get_objects_sha_kind(self) -> list[str]: 

381 """Return the SHA and type of all git objects (in one string). 

382 

383 Note 

384 ----- 

385 Unreachable commits (and deleted annotated tags) are included as well. 

386 

387 Note 

388 ----- 

389 The ``--unordered`` flag is used because ordering by SHA is not necessary. 

390 

391 """ 

392 CMD = ( 

393 "cat-file --batch-all-objects --unordered " 

394 '--batch-check="%(objectname) %(objecttype)"' 

395 ) 

396 objects = self._run(CMD).strip().split("\n") 

397 

398 if len(objects) == 1 and not objects[0]: 

399 LOG.warning("No objects") 

400 return [] 

401 

402 return objects 

403 

404 def read_object_file(self, sha: str) -> list[str]: 

405 """Read the file associated with an object. 

406 

407 Note 

408 ----- 

409 It is quite slow if all objects are to be read like this (``-p`` stands for 

410 pretty-print). 

411 

412 """ 

413 return self._run(f"cat-file -p {sha}").strip().split("\n") 

414 

415 def get_remotes(self) -> list[str]: 

416 """Return list of remotes.""" 

417 cmd_output = self._run("remote").strip().split("\n") 

418 if len(cmd_output) == 1 and "" in cmd_output: 

419 return [] 

420 return cmd_output 

421 

422 def get_fsck_unreachable_commits(self) -> list[str]: 

423 """Return unreachable commits not in the reflog.""" 

424 cmd_output = ( 

425 self.run_general( 

426 f"{self.command_prefix} fsck --unreachable --no-reflog 2>/dev/null | " 

427 "grep commit | cut -d' ' -f3" 

428 ) 

429 .strip() 

430 .split("\n") 

431 ) 

432 

433 return [] if len(cmd_output) == 1 and "" in cmd_output else cmd_output 

434 

435 def get_remote_heads_sym_ref(self, remotes: list[str]) -> DictStrStr: 

436 """Return symbolic references of remote heads.""" 

437 symb_refs = {} 

438 for remote in remotes: 

439 cmd = f"symbolic-ref refs/remotes/{remote}/HEAD" 

440 try: 

441 cmd_output = self._run(cmd).strip().split("\n") 

442 # drop refs/remotes 

443 symb_refs[f"{remote}/HEAD"] = "/".join(cmd_output[0].split("/")[2:]) 

444 except CalledProcessCustomError: 

445 LOG.warning(f"HEAD not defined for {remote}.") 

446 return symb_refs 

447 

448 def get_prs_heads(self) -> DictStrStr: 

449 """Return heads of pull-requests.""" 

450 try: 

451 cmd_output = self._run("ls-remote").strip().split("\n") 

452 except CalledProcessCustomError as e: 

453 LOG.warning(e) 

454 return {} 

455 

456 out = {} 

457 for line in cmd_output: 

458 match = re.search(f"{SHA_PATTERN}\trefs/pull/(?P<pr_id>\\d+)/head", line) 

459 if match: 

460 out[match.group("pr_id")] = match.group("sha") 

461 

462 return out 

463 

464 def get_branches(self, remotes: list[str]) -> dict[str, DictStrStr]: 

465 """Get local/remote branches (while excluding remote HEADs).""" 

466 refs: dict[str, DictStrStr] = {"local": {}, "remote": {}} 

467 

468 try: 

469 cmd_output = self._run("show-ref").strip().split("\n") 

470 except CalledProcessCustomError: 

471 LOG.warning("No refs") 

472 return refs 

473 

474 for ref in cmd_output: 

475 sha, name = ref.split() 

476 if "refs/heads" in ref: 

477 refs["local"]["/".join(name.split("/")[2:])] = sha 

478 

479 if "refs/remotes" in ref: 

480 # skip remote HEADs (handled in GitCommand.get_remote_heads_sym_ref) 

481 if name not in [f"refs/remotes/{remote}/HEAD" for remote in remotes]: 

482 refs["remote"]["/".join(name.split("/")[2:])] = sha 

483 

484 return refs 

485 

486 def get_local_head_commit_sha(self) -> str: 

487 """Return SHA of the commit pointed to by local HEAD.""" 

488 return self._run("rev-parse HEAD").strip() 

489 

490 def rev_parse_descriptors( 

491 self, descriptors: Optional[list[str]] 

492 ) -> Optional[list[str]]: 

493 """Return a set of SHA corresponding to a list of descriptors. 

494 

495 Note 

496 ----- 

497 A descriptor can be e.g., HEAD, main, a truncated SHA, etc. 

498 

499 """ 

500 if descriptors is None: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 return None 

502 

503 args = " ".join([f"'{descriptor}'" for descriptor in descriptors]) 

504 try: 

505 return self._run(f"rev-parse {args}").strip().split("\n") 

506 except CalledProcessCustomError as e: 

507 LOG.warning(e) 

508 

509 return None 

510 

511 def rev_list_range(self, range_expr: Optional[str]) -> Optional[list[str]]: 

512 """Return set of commit SHA in the range defined by ``range_expr``. 

513 

514 Note 

515 ----- 

516 For example ``range_expr`` could be ``main..feature``. 

517 

518 """ 

519 if range_expr is None: 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true

520 return None 

521 

522 try: 

523 out = self._run(f"rev-list {range_expr}").strip().split("\n") 

524 return None if len(out) == 1 and not out[0] else out 

525 except CalledProcessCustomError as e: 

526 LOG.warning(e) 

527 

528 return None 

529 

530 def get_local_head_branch(self) -> Optional[str]: 

531 """Return name of branch pointed to by HEAD.""" 

532 branch_name = self._run("branch --show-current").strip() 

533 return branch_name if branch_name else None 

534 

535 def local_branch_is_tracking(self, local_branch_sha: str) -> Optional[str]: 

536 """Detect if a local branch is tracking a remote one.""" 

537 try: 

538 cmd = f"rev-parse --symbolic-full-name {local_branch_sha}@{ upstream} " 

539 return self._run(cmd).strip() 

540 except CalledProcessCustomError: 

541 return None 

542 

543 def get_stash_info(self) -> Optional[list[str]]: 

544 """Return stash IDs and their associated SHAs.""" 

545 try: 

546 if not self._run("stash list").strip(): 

547 return None 

548 except CalledProcessCustomError as e: 

549 expected_error = "fatal: this operation must be run in a work tree" 

550 if expected_error in e.stderr.decode("utf-8"): 

551 return [] # we are in a bare repository 

552 raise 

553 

554 cmd = "reflog stash --no-abbrev --format='%H %gD %gs'" 

555 return self._run(cmd).strip().split("\n") 

556 

557 def rev_list(self, args: str) -> str: 

558 """Return output of ``git-rev-list``. 

559 

560 Note 

561 ----- 

562 The ``--all`` flag doesn't imply all commits but all commits reachable from 

563 any reference. 

564 

565 """ 

566 return self._run(f"rev-list {args}") 

567 

568 def ls_tree(self, sha: str) -> list[str]: 

569 """Return children of a tree object. 

570 

571 Note 

572 ----- 

573 The default output of ``git ls-tree SHA`` is the same as 

574 ``git cat-file -p SHA``. Maybe I should use the ``--object-only`` flag. 

575 

576 """ 

577 return self._run(f"ls-tree {sha}").strip().split("\n") 

578 

579 def get_blobs_and_trees_names(self, trees_info: dict[str, list[str]]) -> DictStrStr: 

580 """Return actual names of blobs and trees. 

581 

582 Note 

583 ----- 

584 Based on https://stackoverflow.com/a/25954360. 

585 

586 Note 

587 ----- 

588 A tree object might have no name -- this happens when a repository has no 

589 directories (note that a commit always has an associated tree object) or when a 

590 tree object is created manually (without a commit). Sometimes a blob has no 

591 name, e.g., when it are created manually (``git hash-object -w``) or it is not 

592 referenced by a tree object. 

593 

594 """ 

595 cmd_out = ( 

596 self.run_general( 

597 f"{self.command_prefix} rev-list --objects --reflog --all | " 

598 f"{self.command_prefix} cat-file " 

599 "--batch-check='%(objectname) %(objecttype) %(rest)' | " 

600 r"grep '^[^ ]* blob\|tree' | " 

601 "cut -d' ' -f1,3" 

602 ) 

603 .strip() 

604 .split("\n") 

605 ) 

606 

607 sha_name = {} 

608 for blob_or_tree in cmd_out: 

609 components = blob_or_tree.split() 

610 if len(components) == 2: 

611 sha_name[components[0]] = components[1] 

612 

613 # may add names of standalone trees/objects 

614 for tree_info in trees_info.values(): 

615 for tree_or_blob in tree_info: 

616 if tree_or_blob: # protect against the empty tree object 

617 sha, name = tree_or_blob.split(" ")[-1].split("\t") 

618 sha_name[sha] = name 

619 

620 return sha_name 

621 

622 def get_tags_info_parsed(self) -> dict[str, dict[str, DictStrStr]]: 

623 """Return parsed info for all annotated and lightweight tags. 

624 

625 Note 

626 ----- 

627 The ``git for-each-ref ...`` command (see 

628 :obj:`~git_dag.constants.CMD_TAGS_INFO`) used in this function doesn't return 

629 deleted annotated tags. They are handled separately in 

630 :func:`GitInspector._get_objects_info_parsed` (note that their SHA is included 

631 in the output of :func:`GitCommand.get_objects_sha_kind`). 

632 

633 Note 

634 ----- 

635 The ``--python`` flag (see :obj:`~git_dag.constants.CMD_TAGS_INFO`) forms 

636 groups delimited by ``'...'`` which makes them easy to split and parse. On the 

637 flip-side, we have to decode escapes of escapes while preserving unicode 

638 characters. Note that if the message contains ``\\n``-s (i.e., one backlash), 

639 they would appear as ``\\\\\\\\n`` (four backlashes). 

640 

641 """ 

642 tags: dict[str, dict[str, DictStrStr]] = {"annotated": {}, "lightweight": {}} 

643 for raw_tag in [ 

644 dict(zip(TAG_FORMAT_FIELDS, re.findall(r"'((?:[^'\\]|\\.)*)'", t))) 

645 # splitlines() cannot be used here because it splits on CRLF characters 

646 for t in self._run(CMD_TAGS_INFO).strip().split("\n") 

647 if t # when there are no tags "".split("\n") results in [""] 

648 ]: 

649 if raw_tag["object"]: 

650 raw_tag["anchor"] = raw_tag.pop("object") 

651 raw_tag["message"] = escape_decode(raw_tag["contents"]) 

652 tags["annotated"][raw_tag.pop("sha")] = raw_tag # indexed by SHA 

653 else: 

654 raw_tag["anchor"] = raw_tag.pop("sha") 

655 tags["lightweight"][raw_tag.pop("refname")] = raw_tag # indexed by name 

656 

657 return tags 

658 

659 def get_notes_dag_root(self) -> Optional[DictStrStr]: 

660 """Return the root node of the DAG for git notes.""" 

661 notes_ref = self._run("notes get-ref").strip().split("\n")[0] 

662 try: 

663 notes_dag_root = self._run(f"rev-list {notes_ref}").strip().split("\n")[0] 

664 except CalledProcessCustomError: 

665 return None # there are no git notes 

666 return {"ref": notes_ref, "root": notes_dag_root} 

667 

668 

669class TestGitRepository: 

670 """Create test git repository.""" 

671 

672 @classmethod 

673 def create( 

674 cls, 

675 label: Literal["default", "default-with-notes", "empty"], 

676 repo_path: Path | str, # assumed to exist 

677 tar_file_name: Optional[Path | str] = None, 

678 **kwargs: dict[str, Any], 

679 ) -> GitCommandMutate: 

680 """Git repository creation displatch.""" 

681 match label: 

682 case "default": 

683 git = cls.repository_default(repo_path) 

684 case "default-with-notes": 

685 git = cls.repository_default_with_notes(repo_path) 

686 case "empty": 

687 git = cls.repository_empty(repo_path, **kwargs) 

688 case _: 

689 raise ValueError(f"Unknown repository label: {label}") 

690 

691 if tar_file_name is not None: 

692 cls.tar(repo_path, tar_file_name) 

693 

694 return git 

695 

696 @staticmethod 

697 def tar(src_path: Path | str, tar_file_name: Path | str) -> None: 

698 """Tar a git repository.""" 

699 with tarfile.open(tar_file_name, "w:gz") as h: 

700 h.add(src_path, arcname=".") 

701 

702 @staticmethod 

703 def untar(tar_file_name: Path | str, extract_path: Path | str) -> None: 

704 """Untar a git repository.""" 

705 with tarfile.open(tar_file_name, "r:gz") as tar: 

706 tar.extractall(path=extract_path, filter="fully_trusted") 

707 

708 @staticmethod 

709 def repository_empty( 

710 path: Path | str, 

711 files: Optional[DictStrStr] = None, 

712 ) -> GitCommandMutate: 

713 """Empty repository (possibly with files added to the index).""" 

714 git = GitCommandMutate(path) 

715 git.init() 

716 

717 if files is not None: 

718 git.add(files) 

719 

720 return git 

721 

722 @staticmethod 

723 def repository_default(path: Path | str) -> GitCommandMutate: 

724 """Default repository.""" 

725 git = GitCommandMutate(path, date=COMMIT_DATE) 

726 git.init() 

727 git.cm("A\n\nBody:\n * First line\n * Second line\n * Third line") 

728 git.br("topic", create=True) 

729 git.cm("D") 

730 git.br("feature", create=True) 

731 git.cm("F") 

732 git.cm("G", files={"file": "G"}) 

733 git.br("topic") 

734 git.cm("E", files={"file": "E"}) 

735 git.mg("feature") 

736 git.tag("0.1", "Summary\n\nBody:\n * First line\n * Second line\n * Third line") 

737 git.tag("0.2", "Summary\n\nBody:\n * First line\n * Second line\n * Third line") 

738 git.cm("H") 

739 git.br("main") 

740 git.cm(["B", "C"]) 

741 git.tag("0.7", "tag 0.7") 

742 git.tag("0.7r", "ref to tag 0.7", ref="0.7") 

743 git.tag("0.7rr", "ref to ref to tag 0.7", ref="0.7r") 

744 git.br("feature", delete=True) 

745 git.br("topic") 

746 git.tag("0.3", "T1") 

747 git.tag("0.4") 

748 git.tag("0.5") 

749 git.tag("0.1", delete=True) 

750 git.tag("0.4", delete=True) 

751 git.br("bugfix", create=True) 

752 git.cm("I") 

753 git.tag( 

754 # pylint: disable=invalid-character-sub 

755 "0.6", 

756 "Test:                    €.", 

757 ) 

758 git.cm("J") 

759 git.br("topic") 

760 git.br("bugfix", delete=True) 

761 git.stash({"file": "stash:first"}) 

762 git.stash({"file": "stash:second"}, title="second") 

763 git.stash({"file": "stash:third"}, title="third") 

764 

765 # add two standalone blobs and a standalone tree 

766 prefix = git.command_prefix 

767 git.run_general(f"echo 'test content 1' | {prefix} hash-object -w --stdin") 

768 git.run_general(f"echo 'test content 2' | {prefix} hash-object -w --stdin") 

769 

770 sha = "74689c87fb53b6d666de95efea667d99ba2fa52a" 

771 git.run_general(f"{prefix} update-index --add --cacheinfo 100644 {sha} tmp.txt") 

772 git.run_general(f"{prefix} write-tree") 

773 

774 git.config("gc.auto 0") 

775 

776 return git 

777 

778 @classmethod 

779 def repository_default_with_notes(cls, path: Path | str) -> GitCommandMutate: 

780 """Default repository with git notes.""" 

781 git = cls.repository_default(path) 

782 

783 git.note("Add a note") 

784 git.note("Add a another note", "main") 

785 

786 return git 

787 

788 

789def create_test_repo_and_reference_dot_file( 

790 path: Path | str = "test/resources/default_repo", 

791) -> None: 

792 """Create a git repository and its associated DOT file (to use as reference). 

793 

794 Note 

795 ----- 

796 This is meant to be used only when the test resources should be changed. To execute: 

797 ``cd src/git_dag && python git_commands.py``. 

798 

799 """ 

800 # pylint: disable=import-outside-toplevel 

801 import shutil 

802 

803 from git_dag.git_repository import GitRepository # pylint: disable=cyclic-import 

804 

805 path = Path(path) 

806 path.mkdir() 

807 TestGitRepository.create("default", path) 

808 

809 repo = GitRepository(path, parse_trees=True) 

810 with context_ignore_config_file(): 

811 params = Params( 

812 public=ParamsPublic( 

813 show_unreachable_commits=True, 

814 show_local_branches=True, 

815 show_remote_branches=True, 

816 show_trees=True, 

817 show_trees_standalone=True, 

818 show_blobs=True, 

819 show_blobs_standalone=True, 

820 show_tags=True, 

821 show_deleted_tags=True, 

822 show_stash=True, 

823 show_head=True, 

824 max_numb_commits=0, 

825 annotations=[ 

826 ["4499ee63", "just a tooltip"], 

827 ["0.3", "additional info"], 

828 ["0.5"], # this will not be displayed 

829 ["HEAD"], 

830 ["main^", "a clarification"], 

831 ], 

832 format="gv", 

833 file=path / "../default_repo.gv", 

834 ) 

835 ) 

836 repo.show(params) 

837 

838 # TestGitRepository.tar(path, path / "../default_repo.tar.gz") 

839 

840 with open(path / "../default_repo.repr", "w", encoding="utf-8") as h: 

841 h.write(repr(repo)) 

842 

843 shutil.rmtree(path) 

844 

845 

846if __name__ == "__main__": # pragma: no cover 

847 create_test_repo_and_reference_dot_file()