[git] Migrate git method from Ruby to Python (#178)
The main reason for doing this is to have some common code between scripts, so that it is easier to change the JSON schema globally and normalize a few things (such as release order). The Ruby code was kept as is so we can quickly roll back if necessary.
This commit is contained in:
@@ -1,76 +1,66 @@
|
||||
from subprocess import call, Popen, PIPE, DEVNULL
|
||||
from typing import Union, List
|
||||
from pathlib import Path
|
||||
from hashlib import sha1
|
||||
from pathlib import Path
|
||||
from subprocess import run, PIPE
|
||||
|
||||
|
||||
class Git:
|
||||
"""
|
||||
Git cli wrapper
|
||||
used by debian.py and red-hat-openshift.py
|
||||
"""Git cli wrapper
|
||||
"""
|
||||
|
||||
def __init__(self, url: str):
|
||||
self.url: str = url
|
||||
self.repo_dir: Path = Path(
|
||||
f"~/.cache/git/{sha1(self.url.encode()).hexdigest()}"
|
||||
).expanduser()
|
||||
self.repo_dir: Path = Path(f"~/.cache/git/{sha1(url.encode()).hexdigest()}").expanduser()
|
||||
|
||||
def _run(self, cmd: str, return_output: bool = False) -> Union[bool, list]:
|
||||
def _run(self, cmd: str) -> list:
|
||||
"""Run git command and return command result as a list of lines.
|
||||
"""
|
||||
Run git command and return True on success or False on failure.
|
||||
Optionaly returns command result instead.
|
||||
"""
|
||||
git_opts = f"--git-dir={self.repo_dir}/.git --work-tree={self.repo_dir}"
|
||||
try:
|
||||
child = run(f"git {cmd}", capture_output=True, timeout=300, check=True, shell=True, cwd=self.repo_dir)
|
||||
return child.stdout.decode("utf-8").strip().split("\n")
|
||||
except ChildProcessError as ex:
|
||||
raise RuntimeError(f"Failed to run '{git_command}': {ex}")
|
||||
|
||||
if return_output:
|
||||
child = Popen(
|
||||
f"git {git_opts} {cmd}",
|
||||
shell=True,
|
||||
stdout=PIPE,
|
||||
)
|
||||
return child.communicate()[0].decode("utf-8").split("\n")
|
||||
|
||||
return call(f"git {git_opts} {cmd}",
|
||||
shell=True,
|
||||
stdout=DEVNULL,
|
||||
stderr=DEVNULL) == 0
|
||||
|
||||
def setup(self):
|
||||
"""
|
||||
Creates the repository path and runs:
|
||||
def setup(self, bare: bool = False):
|
||||
"""Creates the repository path and runs:
|
||||
git init
|
||||
git remote add origin $url
|
||||
"""
|
||||
self.repo_dir.mkdir(parents=True, exist_ok=True)
|
||||
if not Path(f"{self.repo_dir}/.git").exists():
|
||||
self._run("init")
|
||||
if not Path(f"{self.repo_dir}").exists():
|
||||
self.repo_dir.mkdir(parents=True, exist_ok=True)
|
||||
bare = "--bare" if bare else ""
|
||||
self._run(f"init {bare}")
|
||||
self._run(f"remote add origin {self.url}")
|
||||
|
||||
# See https://stackoverflow.com/a/65746233/374236
|
||||
def list_tags(self):
|
||||
"""Fetch and return tags matching the given`pattern`"""
|
||||
# See https://stackoverflow.com/a/65746233/374236
|
||||
self._run("config --local extensions.partialClone true")
|
||||
# Using --force to avoid error like "would clobber existing tag".
|
||||
# See https://stackoverflow.com/questions/58031165/how-to-get-rid-of-would-clobber-existing-tag.
|
||||
self._run("fetch --force --tags --filter=blob:none --depth=1 origin")
|
||||
tags_with_date = self._run("tag --list --format='%(refname:strip=2) %(creatordate:short)'")
|
||||
return [tag_with_date.split(" ") for tag_with_date in tags_with_date]
|
||||
|
||||
def list_branches(self, pattern: str):
|
||||
"""
|
||||
Uses ls-remote to fetch the branch names
|
||||
"""Uses ls-remote to fetch the branch names
|
||||
`pattern` uses fnmatch style globbing
|
||||
"""
|
||||
raw = self._run(
|
||||
# only list v4+ branches because the format in v3 is different
|
||||
f"ls-remote origin '{pattern}'",
|
||||
return_output=True,
|
||||
)
|
||||
lines = self._run(f"ls-remote origin '{pattern}'")
|
||||
|
||||
# this checks keeps the linter quiet, because _run returns a bool OR list
|
||||
# please dont remove
|
||||
if isinstance(raw, bool):
|
||||
if isinstance(lines, bool):
|
||||
return []
|
||||
|
||||
return [line.split("\t")[1][11:] for line in raw if "\t" in line]
|
||||
return [line.split("\t")[1][11:] for line in lines if "\t" in line]
|
||||
|
||||
def checkout(self, branch: str, file_list: List[str] = []):
|
||||
"""
|
||||
Checks out a branch
|
||||
If `file_list` is given, sparse-checkout is used to save bandwith
|
||||
def checkout(self, branch: str, file_list=None):
|
||||
"""Checks out a branch
|
||||
If `file_list` is given, sparse-checkout is used to save bandwidth
|
||||
and only download the given files
|
||||
"""
|
||||
if file_list:
|
||||
self._run(f"sparse-checkout set {' '.join(file_list)}")
|
||||
# --skip-checks needed to avoid error when file_list contains a file
|
||||
self._run(f"sparse-checkout set --skip-checks {' '.join(file_list)}")
|
||||
self._run(f"fetch --filter=blob:none --depth 1 origin {branch}")
|
||||
self._run(f"checkout {branch}")
|
||||
|
||||
Reference in New Issue
Block a user