Enable flake8-annotations linting rules (#267)

See https://docs.astral.sh/ruff/rules/#flake8-annotations-ann.
This commit is contained in:
Marc Wrobel
2023-12-30 10:38:17 +01:00
parent 0e8fe135e4
commit f49e3dff15
12 changed files with 51 additions and 47 deletions

View File

@@ -1,4 +1,5 @@
select = [ select = [
"ANN", # flake8-annotations
"B", # flake8-bugbear "B", # flake8-bugbear
"C90", # mccabe "C90", # mccabe
"E", # pycodestyle errors "E", # pycodestyle errors
@@ -13,5 +14,6 @@ select = [
"YTT", # flake8-2020 "YTT", # flake8-2020
] ]
extend-ignore = [ extend-ignore = [
"ANN101", # Missing type annotation for self in method
"E501", # Line too long "E501", # Line too long
] ]

View File

@@ -21,22 +21,22 @@ This is written in Python because the only package that supports writing back YA
class ReleaseCycle: class ReleaseCycle:
def __init__(self, data): def __init__(self, data: dict) -> None:
self.data = data self.data = data
self.name = data["releaseCycle"] self.name = data["releaseCycle"]
self.matched = False self.matched = False
self.updated = False self.updated = False
def update_with(self, version, date): def update_with(self, version: str, date: datetime.date) -> None:
logging.debug(f"will try to update {self.name} with {version} ({date})") logging.debug(f"will try to update {self.name} with {version} ({date})")
self.matched = True self.matched = True
self.__update_release_date(version, date) self.__update_release_date(version, date)
self.__update_latest(version, date) self.__update_latest(version, date)
def latest(self): def latest(self) -> str | None:
return self.data.get("latest", None) return self.data.get("latest", None)
def includes(self, version): def includes(self, version: str) -> bool:
"""matches releases that are exact (such as 4.1 being the first release for the 4.1 release cycle) """matches releases that are exact (such as 4.1 being the first release for the 4.1 release cycle)
or releases that include a dot just after the release cycle (4.1.*) or releases that include a dot just after the release cycle (4.1.*)
This is important to avoid edge cases like a 4.10.x release being marked under the 4.1 release cycle.""" This is important to avoid edge cases like a 4.10.x release being marked under the 4.1 release cycle."""
@@ -54,14 +54,14 @@ class ReleaseCycle:
or char_after_prefix.isalpha() # build number: prefix = 1.1.0, r = 1.1.0r (ex. openssl) or char_after_prefix.isalpha() # build number: prefix = 1.1.0, r = 1.1.0r (ex. openssl)
) )
def __update_release_date(self, version, date): def __update_release_date(self, version: str, date: datetime.date) -> None:
release_date = self.data.get("releaseDate", None) release_date = self.data.get("releaseDate", None)
if release_date and release_date > date: if release_date and release_date > date:
logging.info(f"{self.name} release date updated from {release_date} to {date} ({version})") logging.info(f"{self.name} release date updated from {release_date} to {date} ({version})")
self.data["releaseDate"] = date self.data["releaseDate"] = date
self.updated = True self.updated = True
def __update_latest(self, version, date): def __update_latest(self, version: str, date: datetime.date) -> None:
old_latest = self.data.get("latest", None) old_latest = self.data.get("latest", None)
old_latest_date = self.data.get("latestReleaseDate", None) old_latest_date = self.data.get("latestReleaseDate", None)
@@ -87,12 +87,12 @@ class ReleaseCycle:
self.data["latestReleaseDate"] = date self.data["latestReleaseDate"] = date
self.updated = True self.updated = True
def __str__(self): def __str__(self) -> str:
return self.name return self.name
class Product: class Product:
def __init__(self, name: str, product_dir: Path, versions_dir: Path): def __init__(self, name: str, product_dir: Path, versions_dir: Path) -> None:
self.name = name self.name = name
self.product_path = product_dir / f"{name}.md" self.product_path = product_dir / f"{name}.md"
self.versions_path = versions_dir / f"{name}.json" self.versions_path = versions_dir / f"{name}.json"
@@ -114,13 +114,13 @@ class Product:
self.updated = False self.updated = False
self.unmatched_versions = {} self.unmatched_versions = {}
def check_latest(self): def check_latest(self) -> None:
for release in self.releases: for release in self.releases:
latest = release.latest() latest = release.latest()
if release.matched and latest not in self.versions.keys(): if release.matched and latest not in self.versions.keys():
logging.info(f"latest version {latest} for {release.name} not found in {self.versions_path}") logging.info(f"latest version {latest} for {release.name} not found in {self.versions_path}")
def process_version(self, version: str, date_str: str): def process_version(self, version: str, date_str: str) -> None:
date = datetime.date.fromisoformat(date_str) date = datetime.date.fromisoformat(date_str)
version_matched = False version_matched = False
@@ -133,7 +133,7 @@ class Product:
if not version_matched: if not version_matched:
self.unmatched_versions[version] = date self.unmatched_versions[version] = date
def write(self): def write(self) -> None:
with open(self.product_path, "w") as product_file: with open(self.product_path, "w") as product_file:
product_file.truncate() product_file.truncate()
product_file.write("---\n") product_file.write("---\n")
@@ -147,14 +147,14 @@ class Product:
product_file.write("\n") product_file.write("\n")
def github_output(message): def github_output(message: str) -> None:
logging.debug(f"GITHUB_OUTPUT += {message.strip()}") logging.debug(f"GITHUB_OUTPUT += {message.strip()}")
if os.getenv("GITHUB_OUTPUT"): if os.getenv("GITHUB_OUTPUT"):
with open(os.getenv("GITHUB_OUTPUT"), 'a') as f: with open(os.getenv("GITHUB_OUTPUT"), 'a') as f:
f.write(message) f.write(message)
def update_product(name, product_dir, releases_dir): def update_product(name: str, product_dir: Path, releases_dir: Path) -> None:
versions_path = releases_dir / f"{name}.json" versions_path = releases_dir / f"{name}.json"
if not exists(versions_path): if not exists(versions_path):
logging.debug(f"Skipping {name}, {versions_path} does not exist") logging.debug(f"Skipping {name}, {versions_path} does not exist")

View File

@@ -2,25 +2,25 @@ import calendar
from datetime import datetime, timezone from datetime import datetime, timezone
def parse_date(text, formats=frozenset([ def parse_date(text: str, formats: list[str] = frozenset([
"%B %d %Y", # January 1 2020 "%B %d %Y", # January 1 2020
"%b %d %Y", # Jan 1 2020 "%b %d %Y", # Jan 1 2020
"%d %B %Y", # 1 January 2020 "%d %B %Y", # 1 January 2020
"%d %b %Y", # 1 Jan 2020 "%d %b %Y", # 1 Jan 2020
"%d-%b-%Y", # 1-Jan-2020 "%d-%b-%Y", # 1-Jan-2020
"%d-%B-%Y", # 1-January-2020 "%d-%B-%Y", # 1-January-2020
"%Y-%m-%d", # 2020-01-01 "%Y-%m-%d", # 2020-01-01
"%m/%d/%Y", # 01/25/2020 "%m/%d/%Y", # 01/25/2020
])) -> datetime: ])) -> datetime:
"""Parse a given text representing a date using a list of formats. """Parse a given text representing a date using a list of formats.
""" """
return parse_datetime(text, formats, to_utc=False) return parse_datetime(text, formats, to_utc=False)
def parse_month_year_date(text, formats=frozenset([ def parse_month_year_date(text: str, formats: list[str] = frozenset([
"%B %Y", # January 2020 "%B %Y", # January 2020
"%b %Y", # Jan 2020 "%b %Y", # Jan 2020
])) -> datetime: ])) -> datetime:
"""Parse a given text representing a partial date using a list of formats, """Parse a given text representing a partial date using a list of formats,
adjusting it to the last day of the month. adjusting it to the last day of the month.
""" """
@@ -29,14 +29,14 @@ def parse_month_year_date(text, formats=frozenset([
return date.replace(day=last_day) return date.replace(day=last_day)
def parse_datetime(text, formats=frozenset([ def parse_datetime(text: str, formats: list[str] = frozenset([
"%Y-%m-%d %H:%M:%S", # 2023-05-01 08:32:34 "%Y-%m-%d %H:%M:%S", # 2023-05-01 08:32:34
"%Y-%m-%dT%H:%M:%S", # 2023-05-01T08:32:34 "%Y-%m-%dT%H:%M:%S", # 2023-05-01T08:32:34
"%Y-%m-%d %H:%M:%S %z", # 2023-05-01 08:32:34 +0900 "%Y-%m-%d %H:%M:%S %z", # 2023-05-01 08:32:34 +0900
"%Y-%m-%dT%H:%M:%S%z", # 2023-05-01T08:32:34+0900 "%Y-%m-%dT%H:%M:%S%z", # 2023-05-01T08:32:34+0900
"%Y-%m-%dT%H:%M:%S.%f%z", # 2023-05-01T08:32:34.123456Z "%Y-%m-%dT%H:%M:%S.%f%z", # 2023-05-01T08:32:34.123456Z
"%a %d %b %Y %H:%M:%S %Z", # Wed, 01 Jan 2020 00:00:00 GMT "%a %d %b %Y %H:%M:%S %Z", # Wed, 01 Jan 2020 00:00:00 GMT
]), to_utc=True) -> datetime: ]), to_utc: bool = True) -> datetime:
"""Parse a given text representing a datetime using a list of formats, """Parse a given text representing a datetime using a list of formats,
optionally converting it to UTC. optionally converting it to UTC.
""" """

View File

@@ -21,7 +21,7 @@ VERSIONS_PATH = os.environ.get("VERSIONS_PATH", "releases")
class AutoConfig: class AutoConfig:
def __init__(self, method: str, config: dict): def __init__(self, method: str, config: dict) -> None:
self.method = method self.method = method
self.url = config[method] self.url = config[method]
self.version_template = Template(config.get("template", DEFAULT_VERSION_TEMPLATE)) self.version_template = Template(config.get("template", DEFAULT_VERSION_TEMPLATE))
@@ -41,7 +41,7 @@ class AutoConfig:
class ProductFrontmatter: class ProductFrontmatter:
def __init__(self, name: str): def __init__(self, name: str) -> None:
self.name: str = name self.name: str = name
self.path: str = f"{PRODUCTS_PATH}/{name}.md" self.path: str = f"{PRODUCTS_PATH}/{name}.md"
@@ -72,13 +72,13 @@ class ProductFrontmatter:
class Product: class Product:
def __init__(self, name: str): def __init__(self, name: str) -> None:
self.name: str = name self.name: str = name
self.versions_path: str = f"{VERSIONS_PATH}/{name}.json" self.versions_path: str = f"{VERSIONS_PATH}/{name}.json"
self.versions = {} self.versions = {}
@staticmethod @staticmethod
def from_file(name: str): def from_file(name: str) -> "Product":
product = Product(name) product = Product(name)
if not os.path.isfile(product.versions_path): if not os.path.isfile(product.versions_path):
@@ -137,7 +137,7 @@ class Product:
return f"<{self.name}>" return f"<{self.name}>"
def list_products(method, products_filter=None) -> list[str]: def list_products(method: str, products_filter: str = None) -> list[str]:
"""Return a list of products that are using the same given update method. """Return a list of products that are using the same given update method.
""" """
products = [] products = []

View File

@@ -8,7 +8,7 @@ class Git:
"""Git cli wrapper """Git cli wrapper
""" """
def __init__(self, url: str): def __init__(self, url: str) -> None:
self.url: str = url self.url: str = url
self.repo_dir: Path = Path(f"~/.cache/git/{sha1(url.encode()).hexdigest()}").expanduser() self.repo_dir: Path = Path(f"~/.cache/git/{sha1(url.encode()).hexdigest()}").expanduser()
@@ -22,7 +22,7 @@ class Git:
except ChildProcessError as ex: except ChildProcessError as ex:
raise RuntimeError(f"Failed to run '{cmd}': {ex}") from ex raise RuntimeError(f"Failed to run '{cmd}': {ex}") from ex
def setup(self, bare: bool = False): def setup(self, bare: bool = False) -> None:
"""Creates the repository path and runs: """Creates the repository path and runs:
git init git init
git remote add origin $url git remote add origin $url
@@ -34,7 +34,7 @@ class Git:
self._run(f"remote add origin {self.url}") self._run(f"remote add origin {self.url}")
# See https://stackoverflow.com/a/65746233/374236 # See https://stackoverflow.com/a/65746233/374236
def list_tags(self): def list_tags(self) -> list[tuple[str, str]]:
"""Fetch and return tags matching the given`pattern`""" """Fetch and return tags matching the given`pattern`"""
# See https://stackoverflow.com/a/65746233/374236 # See https://stackoverflow.com/a/65746233/374236
self._run("config --local extensions.partialClone true") self._run("config --local extensions.partialClone true")
@@ -44,7 +44,7 @@ class Git:
tags_with_date = self._run("tag --list --format='%(refname:strip=2) %(creatordate:short)'") tags_with_date = self._run("tag --list --format='%(refname:strip=2) %(creatordate:short)'")
return [tag_with_date.split(" ") for tag_with_date in tags_with_date] return [tag_with_date.split(" ") for tag_with_date in tags_with_date]
def list_branches(self, pattern: str): def list_branches(self, pattern: str) -> list[str]:
"""Uses ls-remote to fetch the branch names """Uses ls-remote to fetch the branch names
`pattern` uses fnmatch style globbing `pattern` uses fnmatch style globbing
""" """
@@ -56,7 +56,7 @@ class Git:
return [line.split("\t")[1][11:] for line in lines if "\t" in line] return [line.split("\t")[1][11:] for line in lines if "\t" in line]
def checkout(self, branch: str, file_list: list[str] = None): def checkout(self, branch: str, file_list: list[str] = None) -> None:
"""Checks out a branch """Checks out a branch
If `file_list` is given, sparse-checkout is used to save bandwidth If `file_list` is given, sparse-checkout is used to save bandwidth
and only download the given files and only download the given files

View File

@@ -38,6 +38,6 @@ def fetch_urls(urls: list[str], data: any = None, headers: dict[str, str] = None
return fetch_urls(urls, data, headers, next_max_retries, backoff_factor, timeout) return fetch_urls(urls, data, headers, next_max_retries, backoff_factor, timeout)
def fetch_url(url, data: any = None, headers: dict[str, str] = None, def fetch_url(url: str, data: any = None, headers: dict[str, str] = None,
max_retries: int = 10, backoff_factor: float = 0.5, timeout: int = 30) -> Response: max_retries: int = 10, backoff_factor: float = 0.5, timeout: int = 30) -> Response:
return fetch_urls([url], data, headers, max_retries, backoff_factor, timeout)[0] return fetch_urls([url], data, headers, max_retries, backoff_factor, timeout)[0]

View File

@@ -1,3 +1,4 @@
import datetime
import re import re
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
@@ -7,7 +8,7 @@ MILESTONE_PATTERN = re.compile(r'COS \d+ LTS')
VERSION_PATTERN = re.compile(r"^(cos-\d+-\d+-\d+-\d+)") VERSION_PATTERN = re.compile(r"^(cos-\d+-\d+-\d+-\d+)")
def parse_date(date_text): def parse_date(date_text: str) -> datetime:
date_text = date_text.strip().replace('Date: ', '') date_text = date_text.strip().replace('Date: ', '')
date_text = re.sub(r'Sep[a-zA-Z]+', 'Sep', date_text) date_text = re.sub(r'Sep[a-zA-Z]+', 'Sep', date_text)
return dates.parse_date(date_text) return dates.parse_date(date_text)

View File

@@ -1,3 +1,4 @@
from pathlib import Path
from subprocess import run from subprocess import run
from common import dates, endoflife from common import dates, endoflife
@@ -6,7 +7,7 @@ from common.git import Git
"""Fetch Debian versions by parsing news in www.debian.org source repository.""" """Fetch Debian versions by parsing news in www.debian.org source repository."""
def extract_major_versions(product, repo_dir): def extract_major_versions(product: endoflife.Product, repo_dir: Path) -> None:
child = run( child = run(
f"grep -RhE -A 1 '<define-tag pagetitle>Debian [0-9]+.+</q> released' {repo_dir}/english/News " f"grep -RhE -A 1 '<define-tag pagetitle>Debian [0-9]+.+</q> released' {repo_dir}/english/News "
f"| cut -d '<' -f 2 " f"| cut -d '<' -f 2 "
@@ -25,7 +26,7 @@ def extract_major_versions(product, repo_dir):
is_release_line = True is_release_line = True
def extract_point_versions(product, repo_dir): def extract_point_versions(product: endoflife.Product, repo_dir: Path) -> None:
child = run( child = run(
f"grep -Rh -B 10 '<define-tag revision>' {repo_dir}/english/News " f"grep -Rh -B 10 '<define-tag revision>' {repo_dir}/english/News "
"| grep -Eo '(release_date>(.*)<|revision>(.*)<)' " "| grep -Eo '(release_date>(.*)<|revision>(.*)<)' "

View File

@@ -9,7 +9,7 @@ Unfortunately images creation date cannot be retrieved, so we had to use the tag
METHOD = "docker_hub" METHOD = "docker_hub"
def fetch_releases(product, config, url): def fetch_releases(product: endoflife.Product, config: endoflife.AutoConfig, url: str) -> None:
data = http.fetch_url(url).json() data = http.fetch_url(url).json()
for result in data["results"]: for result in data["results"]:

View File

@@ -13,7 +13,7 @@ Note: GraphQL API and GitHub CLI are used because it's simpler: no need to manag
METHOD = "github_releases" METHOD = "github_releases"
def fetch_releases(repo_id): def fetch_releases(repo_id: str) -> list[dict]:
logging.info(f"fetching {repo_id} GitHub releases") logging.info(f"fetching {repo_id} GitHub releases")
(owner, repo) = repo_id.split('/') (owner, repo) = repo_id.split('/')
child = subprocess.run("""gh api graphql --paginate -f query=' child = subprocess.run("""gh api graphql --paginate -f query='

View File

@@ -6,7 +6,7 @@ from common import dates, endoflife, http
VERSION_DATE_PATTERN = re.compile(r"Splunk Enterprise (?P<version>\d+\.\d+(?:\.\d+)*) was (?:first )?released on (?P<date>\w+\s\d\d?,\s\d{4})\.", re.MULTILINE) VERSION_DATE_PATTERN = re.compile(r"Splunk Enterprise (?P<version>\d+\.\d+(?:\.\d+)*) was (?:first )?released on (?P<date>\w+\s\d\d?,\s\d{4})\.", re.MULTILINE)
def get_latest_minor_versions(versions): def get_latest_minor_versions(versions: list[str]) -> list[str]:
versions_split = [v.split('.') for v in versions] versions_split = [v.split('.') for v in versions]
# Group versions by major and minor version # Group versions by major and minor version

View File

@@ -10,7 +10,7 @@ from pathlib import Path
from deepdiff import DeepDiff from deepdiff import DeepDiff
def github_output(name, value): def github_output(name: str, value: str) -> None:
if "GITHUB_OUTPUT" not in os.environ: if "GITHUB_OUTPUT" not in os.environ:
logging.debug(f"GITHUB_OUTPUT does not exist, but would have written: {name}={value.strip()}") logging.debug(f"GITHUB_OUTPUT does not exist, but would have written: {name}={value.strip()}")
return return
@@ -28,7 +28,7 @@ def github_output(name, value):
logging.debug(f"Wrote to GITHUB_OUTPUT: {name}={value.strip()}") logging.debug(f"Wrote to GITHUB_OUTPUT: {name}={value.strip()}")
def add_summary_line(line): def add_summary_line(line: str) -> None:
if "GITHUB_STEP_SUMMARY" not in os.environ: if "GITHUB_STEP_SUMMARY" not in os.environ:
logging.debug(f"GITHUB_STEP_SUMMARY does not exist, but would have written: {line}") logging.debug(f"GITHUB_STEP_SUMMARY does not exist, but would have written: {line}")
return return