Coverage for src/git_dag/dag.py: 87%

227 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 12:49 +0200

1"""DAG visualization. 

2 

3Note 

4----- 

5The edge between two git objects points towards the parent object. I consider a commit 

6to be the child of its associated tree (because it is formed from it) and this tree is a 

7child of its blobs (and trees). A commit is the parent of a tag (that points to it). 

8 

9""" 

10 

11from __future__ import annotations 

12 

13import subprocess 

14from dataclasses import dataclass 

15from pathlib import Path 

16from typing import TYPE_CHECKING, Any, Optional, Protocol 

17 

18from .constants import ( 

19 GIT_EMPTY_TREE_OBJECT_SHA, 

20 HTML_EMBED_SVG, 

21 DagBackends, 

22 DictStrStr, 

23) 

24from .git_objects import GitBlob, GitCommit, GitTag, GitTree 

25from .interfaces.graphviz import DagGraphviz 

26from .parameters import Params 

27from .utils import transform_ascii_control_chars 

28 

29if TYPE_CHECKING: # pragma: no cover 

30 from .git_repository import GitRepository 

31 

32 

33class MixinProtocol(Protocol): 

34 """Mixin protocol.""" 

35 

36 repository: GitRepository 

37 params: Params 

38 objects_sha_to_include: Optional[set[str]] 

39 dag: Any 

40 included_nodes_id: set[str] 

41 tooltip_names: DictStrStr 

42 in_range_commits: Optional[list[str]] 

43 

44 def _is_object_to_include(self, sha: str) -> bool: ... # pragma: no cover 

45 def _is_tag_to_include(self, item: GitTag) -> bool: ... # pragma: no cover 

46 def _add_notes_label_node(self, sha: str, ref: str) -> None: ... # pragma: no cover 

47 

48 

49class CommitHandlerMixin: 

50 """Handle commits.""" 

51 

52 def _add_notes_label_node(self: MixinProtocol, sha: str, ref: str) -> None: 

53 """Add a node that labels the root of the git notes DAG.""" 

54 self.dag.node( 

55 name="GIT-NOTES-LABEL", 

56 label="git notes", 

57 fillcolor=self.params.dag_node_colors.notes, 

58 tooltip=ref, 

59 shape="egg", 

60 ) 

61 self.dag.edge("GIT-NOTES-LABEL", sha) 

62 

63 def _add_commit(self: MixinProtocol, sha: str, item: GitCommit) -> None: 

64 def form_tooltip(item: GitCommit, sha: Optional[str] = None) -> str: 

65 sha_field = "" if sha is None else f"sha: {sha}\n\n" 

66 return repr( 

67 f"{sha_field}" 

68 f"author: {item.author} {item.author_email}\n" 

69 f"{item.author_date}\n" 

70 f"committer: {item.committer} {item.committer_email}\n" 

71 f"{item.committer_date}\n\n" 

72 f"{transform_ascii_control_chars(item.message)}" 

73 )[1:-1] 

74 

75 unreachable_switch = ( 

76 item.is_reachable or self.params.public.show_unreachable_commits 

77 ) 

78 

79 if self._is_object_to_include(sha) and unreachable_switch: 

80 self.included_nodes_id.add(sha) 

81 color_label = "commit" if item.is_reachable else "commit_unreachable" 

82 is_in_range = ( 

83 self.in_range_commits is not None and sha in self.in_range_commits 

84 ) 

85 

86 if self.params.public.commit_message_as_label > 0: 

87 tooltip_sha = sha 

88 label = item.message[: self.params.public.commit_message_as_label] 

89 else: 

90 tooltip_sha = None 

91 label = sha[: self.params.misc.sha_truncate] 

92 

93 color = getattr(self.params.dag_node_colors, color_label) 

94 self.dag.node( 

95 name=sha, 

96 label=label, 

97 color=( 

98 self.params.dag_node_colors.commit_in_range 

99 if is_in_range 

100 else color 

101 ), 

102 fillcolor=color, 

103 tooltip=form_tooltip(item, tooltip_sha), 

104 ) 

105 

106 if self.repository.notes_dag_root is not None: 

107 if sha == self.repository.notes_dag_root["root"]: 

108 self._add_notes_label_node( 

109 sha, 

110 self.repository.notes_dag_root["ref"], 

111 ) 

112 

113 if self.params.public.show_trees: 

114 self.dag.edge(sha, item.tree.sha) 

115 

116 for parent in item.parents: 

117 if self._is_object_to_include(parent.sha): 117 ↛ 116line 117 didn't jump to line 116 because the condition on line 117 was always true

118 self.dag.edge(sha, parent.sha) 

119 

120 

121class TreeBlobHandlerMixin: 

122 """Handle trees and blobs.""" 

123 

124 def _add_tree( 

125 self: MixinProtocol, 

126 sha: str, 

127 item: GitTree, 

128 standalone: bool = False, 

129 ) -> None: 

130 self.included_nodes_id.add(sha) 

131 if sha == GIT_EMPTY_TREE_OBJECT_SHA: 

132 color_label = "the_empty_tree" 

133 tooltip = f"THE EMPTY TREE\n{GIT_EMPTY_TREE_OBJECT_SHA}" 

134 else: 

135 color_label = "tree" 

136 tooltip = self.tooltip_names.get(sha, sha) 

137 

138 color = getattr(self.params.dag_node_colors, color_label) 

139 self.dag.node( 

140 name=sha, 

141 label=sha[: self.params.misc.sha_truncate], 

142 color=color, 

143 fillcolor=color, 

144 shape="folder", 

145 tooltip=tooltip, 

146 standalone_kind="tree" if standalone else None, 

147 ) 

148 

149 for child in item.children: 

150 match child: 

151 case GitTree(): 151 ↛ 152line 151 didn't jump to line 152 because the pattern on line 151 never matched

152 self.dag.edge(sha, child.sha) 

153 case GitBlob(): 153 ↛ 149line 153 didn't jump to line 149 because the pattern on line 153 always matched

154 if self.params.public.show_blobs: 154 ↛ 149line 154 didn't jump to line 149 because the condition on line 154 was always true

155 self.dag.edge(sha, child.sha) 

156 

157 def _add_blob(self: MixinProtocol, sha: str, standalone: bool = False) -> None: 

158 self.included_nodes_id.add(sha) 

159 self.dag.node( 

160 name=sha, 

161 label=sha[: self.params.misc.sha_truncate], 

162 color=self.params.dag_node_colors.blob, 

163 fillcolor=self.params.dag_node_colors.blob, 

164 shape="note", 

165 tooltip=self.tooltip_names.get(sha, sha), 

166 standalone_kind="blob" if standalone else None, 

167 ) 

168 

169 

170class TagHandlerMixin: 

171 """Handle tags.""" 

172 

173 def _is_tag_to_include(self: MixinProtocol, item: GitTag) -> bool: 

174 """Check if an annotated tag should be displayed. 

175 

176 Note 

177 ----- 

178 Lightweight tags cannot point to other tags or be pointed by annotated tags. 

179 

180 """ 

181 while isinstance(item.anchor, GitTag): 

182 item = item.anchor 

183 return item.anchor.sha in self.included_nodes_id 

184 

185 def _add_annotated_tags(self: MixinProtocol) -> None: 

186 def form_tooltip(item: GitTag) -> str: 

187 return repr( 

188 f"{item.tagger} {item.tagger_email}\n" 

189 f"{item.tagger_date}\n\n" 

190 f"{transform_ascii_control_chars(item.message)}" 

191 )[1:-1] 

192 

193 for sha, item in self.repository.tags.items(): 

194 if self._is_tag_to_include(item): 

195 if self.params.public.show_deleted_tags or not item.is_deleted: 195 ↛ 193line 195 didn't jump to line 193 because the condition on line 195 was always true

196 self.included_nodes_id.add(sha) 

197 color_label = "tag_deleted" if item.is_deleted else "tag" 

198 color = getattr(self.params.dag_node_colors, color_label) 

199 self.dag.node( 

200 name=sha, 

201 label=item.name, 

202 color=color, 

203 fillcolor=color, 

204 tooltip=form_tooltip(item), 

205 ) 

206 self.dag.edge(sha, item.anchor.sha) 

207 

208 def _add_lightweight_tags(self: MixinProtocol) -> None: 

209 for name, item in self.repository.tags_lw.items(): 

210 if self._is_object_to_include(item.anchor.sha): 

211 node_id = f"lwt-{name}-{item.anchor.sha}" 

212 self.dag.node( 

213 name=node_id, 

214 label=name, 

215 color=self.params.dag_node_colors.tag_lw, 

216 fillcolor=self.params.dag_node_colors.tag_lw, 

217 tooltip=item.anchor.sha, 

218 ) 

219 if item.anchor.sha in self.included_nodes_id: 219 ↛ 209line 219 didn't jump to line 209 because the condition on line 219 was always true

220 self.dag.edge(node_id, item.anchor.sha) 

221 

222 

223class StashHandlerMixin: 

224 """Handle stash.""" 

225 

226 def _add_stashes(self: MixinProtocol) -> None: 

227 for stash in self.repository.stashes: 

228 if self._is_object_to_include(stash.commit.sha): 

229 stash_id = f"stash-{stash.index}" 

230 self.dag.node( 

231 name=stash_id, 

232 label=f"stash:{stash.index}", 

233 color=self.params.dag_node_colors.stash, 

234 fillcolor=self.params.dag_node_colors.stash, 

235 tooltip=stash.title, 

236 ) 

237 if ( 237 ↛ 227line 237 didn't jump to line 227 because the condition on line 237 was always true

238 self.params.public.show_unreachable_commits 

239 or stash.commit.is_reachable 

240 ): 

241 self.dag.edge(stash_id, stash.commit.sha) 

242 

243 

244class BranchHandlerMixin: 

245 """Handle branches.""" 

246 

247 def _add_local_branches(self: MixinProtocol) -> None: 

248 local_branches = [b for b in self.repository.branches if b.is_local] 

249 for branch in local_branches: 

250 if self._is_object_to_include(branch.commit.sha): 

251 node_id = f"local-branch-{branch.name}" 

252 self.dag.node( 

253 name=node_id, 

254 label=branch.name, 

255 color=self.params.dag_node_colors.local_branches, 

256 fillcolor=self.params.dag_node_colors.local_branches, 

257 tooltip=f"-> {branch.tracking}", 

258 ) 

259 self.dag.edge(node_id, branch.commit.sha) 

260 

261 def _add_remote_branches(self: MixinProtocol) -> None: 

262 remote_branches = [b for b in self.repository.branches if not b.is_local] 

263 for branch in remote_branches: 263 ↛ 264line 263 didn't jump to line 264 because the loop on line 263 never started

264 if self._is_object_to_include(branch.commit.sha): 

265 node_id = f"remote-branch-{branch.name}" 

266 self.dag.node( 

267 name=node_id, 

268 label=branch.name, 

269 color=self.params.dag_node_colors.remote_branches, 

270 fillcolor=self.params.dag_node_colors.remote_branches, 

271 ) 

272 self.dag.edge(node_id, branch.commit.sha) 

273 

274 

275class HeadHandlerMixin: 

276 """Handle HEAD.""" 

277 

278 def _add_local_head(self: MixinProtocol) -> None: 

279 head = self.repository.head 

280 if head.is_defined: 280 ↛ exitline 280 didn't return from function '_add_local_head' because the condition on line 280 was always true

281 assert head.commit is not None # to make mypy happy 

282 if self._is_object_to_include(head.commit.sha): 

283 if head.is_detached: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 self.dag.edge("HEAD", head.commit.sha) 

285 tooltip = head.commit.sha 

286 else: 

287 assert head.branch is not None # to make mypy happy 

288 self.dag.edge("HEAD", f"local-branch-{head.branch.name}") 

289 tooltip = head.branch.name 

290 

291 color = self.params.dag_node_colors.head 

292 self.dag.node( 

293 name="HEAD", 

294 label="HEAD", 

295 color=None if head.is_detached else color, 

296 fillcolor=color, 

297 tooltip=tooltip, 

298 ) 

299 

300 def _add_remote_heads(self: MixinProtocol) -> None: 

301 for head, ref in self.repository.remote_heads.items(): 301 ↛ 302line 301 didn't jump to line 302 because the loop on line 301 never started

302 self.dag.node( 

303 name=head, 

304 label=head, 

305 color=self.params.dag_node_colors.head, 

306 fillcolor=self.params.dag_node_colors.head, 

307 tooltip=ref, 

308 ) 

309 self.dag.edge(head, f"remote-branch-{ref}") 

310 

311 def _add_prs_heads(self: MixinProtocol) -> None: 

312 """Add pull-request heads.""" 

313 if self.params.public.show_prs_heads: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 prs_heads = self.repository.inspector.git.get_prs_heads() 

315 if prs_heads is not None: 

316 for pr_id, sha in prs_heads.items(): 

317 if sha in self.included_nodes_id: 

318 node_name = f"PR_{pr_id}_HEAD" 

319 self.dag.node( 

320 name=node_name, 

321 label=pr_id, 

322 color=self.params.dag_node_colors.head, 

323 fillcolor=self.params.dag_node_colors.head, 

324 shape="circle", 

325 ) 

326 self.dag.edge(node_name, sha) 

327 

328 def _add_annotations(self: MixinProtocol) -> None: 

329 if self.params.public.annotations is not None: 

330 for annotation in self.params.public.annotations: 

331 descriptor = annotation[0].strip() 

332 shas = self.repository.inspector.git.rev_parse_descriptors([descriptor]) 

333 

334 if shas is None: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 continue 

336 

337 sha = shas[0] 

338 if descriptor in sha or isinstance( 

339 self.repository.objects[sha], GitTag 

340 ): 

341 tooltip = ( 

342 None # the annotation will not be displayed at all 

343 if len(annotation) == 1 

344 else " ".join(annotation[1:]) 

345 ) 

346 label = self.params.misc.annotations_symbol 

347 shape = self.params.misc.annotations_shape 

348 else: 

349 tooltip = ( 

350 descriptor if len(annotation) == 1 else " ".join(annotation[1:]) 

351 ) 

352 label = descriptor 

353 shape = None 

354 

355 if sha in self.included_nodes_id and tooltip is not None: 355 ↛ 330line 355 didn't jump to line 330 because the condition on line 355 was always true

356 # colon in node name not supported by graphviz 

357 name = f"annotation-{descriptor.replace(":", "=")}" 

358 self.dag.node( 

359 name=name, 

360 label=label[: self.params.misc.annotations_truncate], 

361 color=self.params.dag_node_colors.annotations, 

362 fillcolor=self.params.dag_node_colors.annotations, 

363 tooltip=tooltip, 

364 shape=shape, 

365 ) 

366 self.dag.edge(name, sha, style="dashed") 

367 

368 

369@dataclass 

370class DagVisualizer( 

371 CommitHandlerMixin, 

372 TreeBlobHandlerMixin, 

373 TagHandlerMixin, 

374 StashHandlerMixin, 

375 BranchHandlerMixin, 

376 HeadHandlerMixin, 

377): 

378 """Git DAG visualizer.""" 

379 

380 repository: GitRepository 

381 params: Params 

382 objects_sha_to_include: Optional[set[str]] = None 

383 in_range_commits: Optional[list[str]] = None 

384 

385 def __post_init__(self) -> None: 

386 self.tooltip_names = self.repository.inspector.blobs_and_trees_names 

387 self.included_nodes_id: set[str] = set() 

388 

389 match DagBackends[self.params.public.dag_backend.upper()]: 

390 case DagBackends.GRAPHVIZ: 390 ↛ 395line 390 didn't jump to line 395 because the pattern on line 390 always matched

391 self.dag = DagGraphviz( 

392 self.params.public.show_blobs_standalone 

393 or self.params.public.show_trees_standalone 

394 ) 

395 case _: 

396 raise ValueError( 

397 f"Unrecognised backend: {self.params.public.dag_backend}" 

398 ) 

399 

400 self._build_dag() 

401 

402 @staticmethod 

403 def _embed_svg_in_html(filename: str) -> None: 

404 with open( 

405 "docs/sphinx/src/.static/js/svg-pan-zoom.min.js", 

406 "r", 

407 encoding="utf-8", 

408 ) as h: 

409 svg_pan_zoom_js = h.read() 

410 

411 with open( 

412 "docs/sphinx/src/.static/js/custom.js", 

413 "r", 

414 encoding="utf-8", 

415 ) as h: 

416 custom_js = h.read() 

417 

418 with open(filename + ".html", "w", encoding="utf-8") as h: 

419 h.write( 

420 HTML_EMBED_SVG.format( 

421 svg_pan_zoom_js=svg_pan_zoom_js, 

422 custom_js=custom_js, 

423 svg_filename=Path(filename).name, 

424 ) 

425 ) 

426 

427 def show(self, xdg_open: bool = False) -> Any: 

428 """Show the dag. 

429 

430 Note 

431 ----- 

432 When the ``format`` is set to ``gv``, only the source file is generated and the 

433 user can generate the DAG manually with any layout engine and parameters. For 

434 example: ``dot -Gnslimit=2 -Tsvg git-dag.gv -o git-dag.gv.svg``, see `this 

435 <https://forum.graphviz.org/t/creating-a-dot-graph-with-thousands-of-nodes/1092/2>`_ 

436 thread. 

437 

438 Generating a DAG with more than 1000 nodes could be time-consuming. It is 

439 recommended to get an initial view using ``git dag -lrto`` and then limit to 

440 specific references and number of nodes using the ``-i`` and ``-n`` flags. 

441 

442 """ 

443 if self.params.public.format == "gv": 

444 with open(self.params.public.file, "w", encoding="utf-8") as h: 

445 h.write(self.dag.source()) 

446 else: 

447 self.dag.render() 

448 

449 filename_format = f"{self.params.public.file}.{self.params.public.format}" 

450 if xdg_open: # pragma: no cover 

451 subprocess.run( 

452 f"xdg-open {filename_format}", 

453 shell=True, 

454 check=True, 

455 ) 

456 

457 if self.params.public.format == "svg" and self.params.public.html_embed_svg: 

458 self._embed_svg_in_html(filename_format) 

459 

460 return self.dag.get() 

461 

462 def _is_object_to_include(self, sha: str) -> bool: 

463 """Return ``True`` if the object with given ``sha`` is to be displayed.""" 

464 if self.objects_sha_to_include is None: 

465 return True 

466 return sha in self.objects_sha_to_include 

467 

468 def _build_dag(self) -> None: 

469 # tags are not handled in this loop 

470 for sha, item in self.repository.objects.items(): 

471 to_include = self._is_object_to_include(sha) 

472 not_reachable = sha not in self.repository.all_reachable_objects_sha 

473 match item: 

474 case GitTree(): 

475 if not_reachable and self.params.public.show_trees_standalone: 

476 self._add_tree(sha, item, standalone=True) 

477 elif to_include and self.params.public.show_trees: 

478 self._add_tree(sha, item) 

479 case GitBlob(): 

480 if not_reachable and self.params.public.show_blobs_standalone: 

481 self._add_blob(sha, standalone=True) 

482 elif to_include and self.params.public.show_blobs: 

483 self._add_blob(sha) 

484 case GitCommit(): 

485 self._add_commit(sha, item) 

486 

487 # no point in displaying HEAD if branches are not displayed 

488 if self.params.public.show_local_branches: 

489 self._add_local_branches() 

490 if self.params.public.show_head: 490 ↛ 493line 490 didn't jump to line 493 because the condition on line 490 was always true

491 self._add_local_head() 

492 

493 if self.params.public.show_remote_branches: 

494 self._add_remote_branches() 

495 if self.params.public.show_head: 495 ↛ 498line 495 didn't jump to line 498 because the condition on line 495 was always true

496 self._add_remote_heads() 

497 

498 if self.params.public.show_tags: 

499 self._add_annotated_tags() 

500 self._add_lightweight_tags() 

501 

502 if self.params.public.show_stash: 

503 self._add_stashes() 

504 

505 self._add_prs_heads() 

506 self._add_annotations() 

507 

508 self.dag.build( 

509 format=self.params.public.format, 

510 node_attr=self.params.dag_node.model_dump(), 

511 edge_attr=self.params.dag_edge.model_dump(), 

512 dag_attr=self.params.dag_global.model_dump(), 

513 filename=self.params.public.file, 

514 cluster_params=self.params.standalone_cluster.model_dump(), 

515 )