Enable flake8-use-pathlib linting rules (#267)

See https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth.
This commit is contained in:
Marc Wrobel
2023-12-30 12:11:21 +01:00
parent 5fa597afc0
commit 47e29992ae
7 changed files with 42 additions and 41 deletions

View File

@@ -18,6 +18,7 @@ select = [
"N", # pep8-naming "N", # pep8-naming
"PIE", # flake8-pie "PIE", # flake8-pie
"PGH", # pygrep-hooks "PGH", # pygrep-hooks
"PTH", # flake8-use-pathlib
"RET", # flake8-return "RET", # flake8-return
"RUF100", # unused noqa (yesqa) "RUF100", # unused noqa (yesqa)
"SLF", # flake8-self "SLF", # flake8-self

View File

@@ -4,7 +4,6 @@ import json
import logging import logging
import os import os
import re import re
from os.path import exists
from pathlib import Path from pathlib import Path
import frontmatter import frontmatter
@@ -97,7 +96,7 @@ class Product:
self.product_path = product_dir / f"{name}.md" self.product_path = product_dir / f"{name}.md"
self.versions_path = versions_dir / f"{name}.json" self.versions_path = versions_dir / f"{name}.json"
with open(self.product_path) as product_file: with self.product_path.open() as product_file:
# First read the frontmatter of the product file. # First read the frontmatter of the product file.
yaml = YAML() yaml = YAML()
yaml.preserve_quotes = True yaml.preserve_quotes = True
@@ -107,7 +106,7 @@ class Product:
product_file.seek(0) product_file.seek(0)
_, self.content = frontmatter.parse(product_file.read()) _, self.content = frontmatter.parse(product_file.read())
with open(self.versions_path) as versions_file: with self.versions_path.open() as versions_file:
self.versions = json.loads(versions_file.read()) self.versions = json.loads(versions_file.read())
self.releases = [ReleaseCycle(release) for release in self.data["releases"]] self.releases = [ReleaseCycle(release) for release in self.data["releases"]]
@@ -134,7 +133,7 @@ class Product:
self.unmatched_versions[version] = date self.unmatched_versions[version] = date
def write(self) -> None: def write(self) -> None:
with open(self.product_path, "w") as product_file: with self.product_path.open("w") as product_file:
product_file.truncate() product_file.truncate()
product_file.write("---\n") product_file.write("---\n")
@@ -150,13 +149,13 @@ class Product:
def github_output(message: str) -> None: def github_output(message: str) -> None:
logging.debug(f"GITHUB_OUTPUT += {message.strip()}") logging.debug(f"GITHUB_OUTPUT += {message.strip()}")
if os.getenv("GITHUB_OUTPUT"): if os.getenv("GITHUB_OUTPUT"):
with open(os.getenv("GITHUB_OUTPUT"), 'a') as f: with open(os.getenv("GITHUB_OUTPUT"), 'a') as f: # NOQA: PTH123
f.write(message) f.write(message)
def update_product(name: str, product_dir: Path, releases_dir: Path) -> None: def update_product(name: str, product_dir: Path, releases_dir: Path) -> None:
versions_path = releases_dir / f"{name}.json" versions_path = releases_dir / f"{name}.json"
if not exists(versions_path): if not versions_path.exists():
logging.debug(f"Skipping {name}, {versions_path} does not exist") logging.debug(f"Skipping {name}, {versions_path} does not exist")
return return

View File

@@ -1,15 +1,15 @@
import sys import sys
import time import time
from glob import glob from pathlib import Path
import frontmatter import frontmatter
products = {} products = {}
count = 0 count = 0
count_auto = 0 count_auto = 0
products_dir = sys.argv[1] if len(sys.argv) > 1 else 'website/products/' products_dir = Path(sys.argv[1] if len(sys.argv) > 1 else 'website/products/')
for product_file in sorted(glob(f'{products_dir}/*.md')): for product_file in sorted(products_dir.glob('*.md')):
with open(product_file) as f: with product_file.open() as f:
data = frontmatter.load(f) data = frontmatter.load(f)
count += 1 count += 1
title = data['title'] title = data['title']

View File

@@ -27,7 +27,7 @@ for branch in git.list_branches("refs/heads/?.?.x"):
if not release_notes_file.exists(): if not release_notes_file.exists():
continue continue
with open(release_notes_file, "rb") as f: with release_notes_file.open("rb") as f:
release_notes = f.read().decode("utf-8", errors="ignore") release_notes = f.read().decode("utf-8", errors="ignore")
for pattern in VERSION_AND_DATE_PATTERNS: for pattern in VERSION_AND_DATE_PATTERNS:

View File

@@ -3,7 +3,7 @@ import logging
import os import os
import re import re
from datetime import datetime, timezone from datetime import datetime, timezone
from glob import glob from pathlib import Path
import frontmatter import frontmatter
from liquid import Template from liquid import Template
@@ -16,8 +16,8 @@ DEFAULT_VERSION_REGEX = r"^v?(?P<major>[1-9]\d*)\.(?P<minor>\d+)(\.(?P<patch>\d+
DEFAULT_VERSION_PATTERN = re.compile(DEFAULT_VERSION_REGEX) DEFAULT_VERSION_PATTERN = re.compile(DEFAULT_VERSION_REGEX)
DEFAULT_VERSION_TEMPLATE = "{{major}}{% if minor %}.{{minor}}{% if patch %}.{{patch}}{% if tiny %}.{{tiny}}{% endif %}{% endif %}{% endif %}" DEFAULT_VERSION_TEMPLATE = "{{major}}{% if minor %}.{{minor}}{% if patch %}.{{patch}}{% if tiny %}.{{tiny}}{% endif %}{% endif %}{% endif %}"
PRODUCTS_PATH = os.environ.get("PRODUCTS_PATH", "website/products") PRODUCTS_PATH = Path(os.environ.get("PRODUCTS_PATH", "website/products"))
VERSIONS_PATH = os.environ.get("VERSIONS_PATH", "releases") VERSIONS_PATH = Path(os.environ.get("VERSIONS_PATH", "releases"))
class AutoConfig: class AutoConfig:
@@ -44,11 +44,11 @@ class AutoConfig:
class ProductFrontmatter: class ProductFrontmatter:
def __init__(self, name: str) -> None: def __init__(self, name: str) -> None:
self.name: str = name self.name: str = name
self.path: str = f"{PRODUCTS_PATH}/{name}.md" self.path: Path = PRODUCTS_PATH / f"{name}.md"
self.data = None self.data = None
if os.path.isfile(self.path): if self.path.is_file():
with open(self.path) as f: with self.path.open() as f:
self.data = frontmatter.load(f) self.data = frontmatter.load(f)
logging.info(f"loaded product data for {self.name} from {self.path}") logging.info(f"loaded product data for {self.name} from {self.path}")
else: else:
@@ -76,15 +76,15 @@ class ProductFrontmatter:
class Product: class Product:
def __init__(self, name: str) -> None: def __init__(self, name: str) -> None:
self.name: str = name self.name: str = name
self.versions_path: str = f"{VERSIONS_PATH}/{name}.json" self.versions_path: Path = VERSIONS_PATH / f"{name}.json"
self.versions = {} self.versions = {}
@staticmethod @staticmethod
def from_file(name: str) -> "Product": def from_file(name: str) -> "Product":
product = Product(name) product = Product(name)
if not os.path.isfile(product.versions_path): if product.versions_path.is_file():
with open(product.versions_path) as f: with product.versions_path.open() as f:
for version, date in json.load(f).items(): for version, date in json.load(f).items():
date_obj = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=timezone.utc) date_obj = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
product.versions[version] = date_obj product.versions[version] = date_obj
@@ -131,7 +131,7 @@ class Product:
def write(self) -> None: def write(self) -> None:
versions = {version: date.strftime("%Y-%m-%d") for version, date in self.versions.items()} versions = {version: date.strftime("%Y-%m-%d") for version, date in self.versions.items()}
with open(self.versions_path, "w") as f: with self.versions_path.open("w") as f:
f.write(json.dumps(dict( f.write(json.dumps(dict(
# sort by date then version (desc) # sort by date then version (desc)
sorted(versions.items(), key=lambda x: (x[1], x[0]), reverse=True), sorted(versions.items(), key=lambda x: (x[1], x[0]), reverse=True),
@@ -146,12 +146,12 @@ def list_products(method: str, products_filter: str = None) -> list[str]:
""" """
products = [] products = []
for product_file in glob(f"{PRODUCTS_PATH}/*.md"): for product_file in PRODUCTS_PATH.glob("*.md"):
product_name = os.path.splitext(os.path.basename(product_file))[0] product_name = product_file.stem
if products_filter and product_name != products_filter: if products_filter and product_name != products_filter:
continue continue
with open(product_file) as f: with product_file.open() as f:
data = frontmatter.load(f) data = frontmatter.load(f)
if "auto" in data: if "auto" in data:
matching_configs = list(filter(lambda config: method in config, data["auto"])) matching_configs = list(filter(lambda config: method in config, data["auto"]))

View File

@@ -22,7 +22,7 @@ for branch in git.list_branches("refs/heads/enterprise-[4-9]*"):
if not release_notes_file.exists(): if not release_notes_file.exists():
continue continue
with open(release_notes_file, "rb") as f: with release_notes_file.open("rb") as f:
content = f.read().decode("utf-8") content = f.read().decode("utf-8")
for (version, date_str) in VERSION_AND_DATE_PATTERN.findall(content): for (version, date_str) in VERSION_AND_DATE_PATTERN.findall(content):
product.declare_version(version, dates.parse_date(date_str)) product.declare_version(version, dates.parse_date(date_str))

View File

@@ -23,7 +23,7 @@ def github_output(name: str, value: str) -> None:
else: else:
command = f"{name}={value}" command = f"{name}={value}"
with open(os.environ["GITHUB_OUTPUT"], 'a') as github_output_var: with open(os.environ["GITHUB_OUTPUT"], 'a') as github_output_var: # NOQA: PTH123
print(command, file=github_output_var) print(command, file=github_output_var)
logging.debug(f"Wrote to GITHUB_OUTPUT: {name}={value.strip()}") logging.debug(f"Wrote to GITHUB_OUTPUT: {name}={value.strip()}")
@@ -33,17 +33,17 @@ def add_summary_line(line: str) -> None:
logging.debug(f"GITHUB_STEP_SUMMARY does not exist, but would have written: {line}") logging.debug(f"GITHUB_STEP_SUMMARY does not exist, but would have written: {line}")
return return
with open(os.environ["GITHUB_STEP_SUMMARY"], 'a') as github_step_summary: with open(os.environ["GITHUB_STEP_SUMMARY"], 'a') as github_step_summary: # NOQA: PTH123
print(line, file=github_step_summary) print(line, file=github_step_summary)
SRC_DIR = 'src' SRC_DIR = Path('src')
DATA_DIR = 'releases' DATA_DIR = Path('releases')
logging.basicConfig(format=logging.BASIC_FORMAT, level=logging.INFO) logging.basicConfig(format=logging.BASIC_FORMAT, level=logging.INFO)
# Run scripts # Run scripts
scripts = sorted([os.path.join(SRC_DIR, file) for file in os.listdir(SRC_DIR) if file.endswith('.py')]) scripts = sorted([SRC_DIR / file for file in os.listdir(SRC_DIR) if file.endswith('.py')])
some_script_failed = False some_script_failed = False
add_summary_line("## Script execution summary\n") add_summary_line("## Script execution summary\n")
@@ -53,7 +53,7 @@ for script in scripts:
logging.info(f"start running {script}") logging.info(f"start running {script}")
start = time.perf_counter() start = time.perf_counter()
child = subprocess.run([sys.executable, script], timeout=300) child = subprocess.run([sys.executable, script]) # timeout handled in subscripts
elapsed_seconds = time.perf_counter() - start elapsed_seconds = time.perf_counter() - start
if child.returncode != 0: if child.returncode != 0:
@@ -67,34 +67,35 @@ for script in scripts:
# Generate commit message # Generate commit message
subprocess.run('git add --all', timeout=10, check=True, shell=True) # to also get new files in git diff subprocess.run('git add --all', timeout=10, check=True, shell=True) # to also get new files in git diff
git_diff = subprocess.run('git diff --name-only --staged', capture_output=True, timeout=10, check=True, shell=True) git_diff = subprocess.run('git diff --name-only --staged', capture_output=True, timeout=10, check=True, shell=True)
updated_files = sorted([Path(file) for file in git_diff.stdout.decode('utf-8').split('\n') if file.startswith(DATA_DIR)]) updated_files = [Path(file) for file in git_diff.stdout.decode('utf-8').split('\n')]
logging.info(f"Updated files: {updated_files}") updated_product_files = sorted([file for file in updated_files if file.parent == DATA_DIR])
logging.info(f"Updated product files: {[file.name for file in updated_product_files]}")
add_summary_line("## Update summary\n") add_summary_line("## Update summary\n")
if updated_files: if updated_product_files:
# get modified files content # get modified files content
new_files_content = {} new_files_content = {}
for path in updated_files: for path in updated_product_files:
with open(path) as file: with path.open() as file:
new_files_content[path] = json.load(file) new_files_content[path] = json.load(file)
# get original files content # get original files content
old_files_content = {} old_files_content = {}
subprocess.run('git stash --all --quiet', timeout=10, check=True, shell=True) subprocess.run('git stash --all --quiet', timeout=10, check=True, shell=True)
for path in updated_files: for path in updated_product_files:
if path.exists(): if path.exists():
with open(path) as file: with path.open() as file:
old_files_content[path] = json.load(file) old_files_content[path] = json.load(file)
else: # new file else: # new file
old_files_content[path] = {} old_files_content[path] = {}
subprocess.run('git stash pop --quiet', timeout=10, check=True, shell=True) subprocess.run('git stash pop --quiet', timeout=10, check=True, shell=True)
# Generate commit message # Generate commit message
product_names = ', '.join([path.stem for path in updated_files]) product_names = ', '.join([path.stem for path in updated_product_files])
commit_message = f"🤖: {product_names}\n\n" commit_message = f"🤖: {product_names}\n\n"
add_summary_line(f"Updated {len(updated_files)} products: {product_names}.") add_summary_line(f"Updated {len(updated_product_files)} products: {product_names}.")
for path in updated_files: for path in updated_product_files:
add_summary_line(f"### {path.stem}\n") add_summary_line(f"### {path.stem}\n")
commit_message += f"{path.stem}:\n" commit_message += f"{path.stem}:\n"