Refactor latest.py and update.py (#270)

- create new gha.py module,
- improve GitHub workflow commands (GITHUB_OUTPUT, GITHUB_STEP_SUMMARY) interactions,
- improve logging,
- split update.py into smaller functions.
This commit is contained in:
Marc Wrobel
2023-12-31 00:45:30 +01:00
committed by GitHub
parent 23883d98e3
commit 7a97c87b3b
4 changed files with 139 additions and 113 deletions

View File

@@ -77,6 +77,6 @@ jobs:
commit_author: 'github-actions[bot] <github-actions[bot]@users.noreply.github.com>'
# we still want to easily know if something went wrong
- name: Set job status
- name: Restore update.py failure
if: steps.update_data.outcome != 'success'
run: exit 1

View File

@@ -2,7 +2,6 @@ import argparse
import datetime
import json
import logging
import os
import re
from pathlib import Path
@@ -12,6 +11,8 @@ from ruamel.yaml import YAML
from ruamel.yaml.representer import RoundTripRepresenter
from ruamel.yaml.resolver import Resolver
from src.common.gha import GitHubOutput
"""
Updates the `release`, `latest` and `latestReleaseDate` property in automatically updated pages
As per data from _data/release-data. This script runs on dependabot upgrade PRs via GitHub Actions for
@@ -21,14 +22,15 @@ This is written in Python because the only package that supports writing back YA
class ReleaseCycle:
def __init__(self, data: dict) -> None:
def __init__(self, product_name: str, data: dict) -> None:
self.product_name = product_name
self.data = data
self.name = data["releaseCycle"]
self.matched = False
self.updated = False
def update_with(self, version: str, date: datetime.date) -> None:
logging.debug(f"will try to update {self.name} with {version} ({date})")
logging.debug(f"will try to update {self} with {version} ({date})")
self.matched = True
self.__update_release_date(version, date)
self.__update_latest(version, date)
@@ -57,7 +59,7 @@ class ReleaseCycle:
def __update_release_date(self, version: str, date: datetime.date) -> None:
release_date = self.data.get("releaseDate", None)
if release_date and release_date > date:
logging.info(f"{self.name} release date updated from {release_date} to {date} ({version})")
logging.info(f"{self} release date updated from {release_date} to {date} ({version})")
self.data["releaseDate"] = date
self.updated = True
@@ -67,20 +69,20 @@ class ReleaseCycle:
update_detected = False
if not old_latest:
logging.info(f"{self.name} latest date updated to {version} ({date}) (no prior latest version)")
logging.info(f"{self} latest date updated to {version} ({date}) (no prior latest version)")
update_detected = True
elif old_latest == version and old_latest_date != date:
logging.info(f"{self.name} latest date updated from {old_latest_date} to {date}")
logging.info(f"{self} latest date updated from {old_latest_date} to {date}")
update_detected = True
else:
try: # Do our best attempt at comparing the version numbers
if Version(old_latest) < Version(version):
logging.info(f"{self.name} latest updated from {old_latest} ({old_latest_date}) to {version} ({date})")
logging.info(f"{self} latest updated from {old_latest} ({old_latest_date}) to {version} ({date})")
update_detected = True
except InvalidVersion:
logging.debug(f"could not not be compare {self.name} with {version}, skipping")
logging.debug(f"could not not be compare {self} with {version}, skipping")
if update_detected:
self.data["latest"] = version
@@ -88,7 +90,7 @@ class ReleaseCycle:
self.updated = True
def __str__(self) -> str:
return self.name
return self.product_name + '#' + self.name
class Product:
@@ -110,7 +112,7 @@ class Product:
with self.versions_path.open() as versions_file:
self.versions = json.loads(versions_file.read())
self.releases = [ReleaseCycle(release) for release in self.data["releases"]]
self.releases = [ReleaseCycle(name, release) for release in self.data["releases"]]
self.updated = False
self.unmatched_versions = {}
@@ -118,7 +120,7 @@ class Product:
for release in self.releases:
latest = release.latest()
if release.matched and latest not in self.versions:
logging.info(f"latest version {latest} for {release.name} not found in {self.versions_path}")
logging.info(f"latest version {latest} for {release} not found in {self.versions_path}")
def process_version(self, version: str, date_str: str) -> None:
date = datetime.date.fromisoformat(date_str)
@@ -147,14 +149,7 @@ class Product:
product_file.write("\n")
def github_output(message: str) -> None:
logging.debug(f"GITHUB_OUTPUT += {message.strip()}")
if os.getenv("GITHUB_OUTPUT"):
with open(os.getenv("GITHUB_OUTPUT"), 'a') as f: # NOQA: PTH123
f.write(message)
def update_product(name: str, product_dir: Path, releases_dir: Path) -> None:
def update_product(name: str, product_dir: Path, releases_dir: Path, output: GitHubOutput) -> None:
versions_path = releases_dir / f"{name}.json"
if not versions_path.exists():
logging.debug(f"Skipping {name}, {versions_path} does not exist")
@@ -169,14 +164,14 @@ def update_product(name: str, product_dir: Path, releases_dir: Path) -> None:
logging.info(f"Updating {product.product_path}")
product.write()
# Print all unmatched versions released in the last 30 days
# List all unmatched versions released in the last 30 days
if len(product.unmatched_versions) != 0:
for version, date in product.unmatched_versions.items():
today = datetime.datetime.now(tz=datetime.timezone.utc).date()
days_since_release = (today - date).days
if days_since_release < 30:
logging.warning(f"{name}:{version} ({date}) not included")
github_output(f"{name}:{version} ({date})\n")
output.println(f"{name}:{version} ({date})")
if __name__ == "__main__":
@@ -196,13 +191,11 @@ if __name__ == "__main__":
# Example of dumping with aliases: https://github.com/endoflife-date/endoflife.date/pull/4368.
RoundTripRepresenter.ignore_aliases = lambda x, y: True # NOQA: ARG005
# See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#example-of-a-multiline-string
github_output("warning<<$EOF\n")
products_dir = Path(args.product_dir)
product_names = [args.product] if args.product else [p.stem for p in products_dir.glob("*.md")]
for product_name in product_names:
logging.debug(f"Processing {product_name}")
update_product(product_name, products_dir, Path(args.data_dir))
github_output("$EOF")
github_output = GitHubOutput("warning")
with github_output:
for product_name in product_names:
logging.debug(f"Processing {product_name}")
update_product(product_name, products_dir, Path(args.data_dir), github_output)

47
src/common/gha.py Normal file
View File

@@ -0,0 +1,47 @@
import logging
import os
from base64 import b64encode
"""See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions."""
class GitHubOutput:
def __init__(self, name: str) -> None:
self.name = name
self.value = ""
def __enter__(self) -> None:
return None
def println(self, value: str) -> None:
self.value += value + "\n"
def __exit__(self, exc_type: any, exc_value: any, traceback: any) -> None:
var_exists = "GITHUB_OUTPUT" in os.environ
delimiter = b64encode(os.urandom(16)).decode()
value = f"{delimiter}\n{self.value}\n{delimiter}"
command = f"{self.name}<<{value}"
logging.info(f"GITHUB_OUTPUT (exists={var_exists}):\n{command}")
if var_exists:
with open(os.environ["GITHUB_OUTPUT"], 'a') as github_output_var: # NOQA: PTH123
print(command, file=github_output_var)
class GitHubStepSummary:
def __init__(self) -> None:
self.value = ""
def __enter__(self) -> None:
return None
def println(self, value: str) -> None:
self.value += value + "\n"
def __exit__(self, exc_type: any, exc_value: any, traceback: any) -> None:
var_exists = "GITHUB_STEP_SUMMARY" in os.environ
logging.info(f"GITHUB_STEP_SUMMARY (exists={var_exists}):\n{self.value}")
if var_exists:
with open(os.environ["GITHUB_STEP_SUMMARY"], 'a') as github_step_summary: # NOQA: PTH123
print(self.value, file=github_step_summary)

154
update.py
View File

@@ -4,113 +4,99 @@ import os
import subprocess
import sys
import time
from base64 import b64encode
from pathlib import Path
from deepdiff import DeepDiff
def github_output(name: str, value: str) -> None:
if "GITHUB_OUTPUT" not in os.environ:
logging.debug(f"GITHUB_OUTPUT does not exist, but would have written: {name}={value.strip()}")
return
if "\n" in value:
# https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#multiline-strings
delimiter = b64encode(os.urandom(16)).decode()
value = f"{delimiter}\n{value}\n{delimiter}"
command = f"{name}<<{value}"
else:
command = f"{name}={value}"
with open(os.environ["GITHUB_OUTPUT"], 'a') as github_output_var: # NOQA: PTH123
print(command, file=github_output_var)
logging.debug(f"Wrote to GITHUB_OUTPUT: {name}={value.strip()}")
def add_summary_line(line: str) -> None:
if "GITHUB_STEP_SUMMARY" not in os.environ:
logging.debug(f"GITHUB_STEP_SUMMARY does not exist, but would have written: {line}")
return
with open(os.environ["GITHUB_STEP_SUMMARY"], 'a') as github_step_summary: # NOQA: PTH123
print(line, file=github_step_summary)
from src.common.gha import GitHubOutput, GitHubStepSummary
SRC_DIR = Path('src')
DATA_DIR = Path('releases')
logging.basicConfig(format=logging.BASIC_FORMAT, level=logging.INFO)
# Run scripts
scripts = sorted([SRC_DIR / file for file in os.listdir(SRC_DIR) if file.endswith('.py')])
some_script_failed = False
def run_scripts(summary: GitHubStepSummary) -> bool:
summary.println("## Script execution summary\n")
summary.println("| Name | Duration | Succeeded |")
summary.println("|------|----------|-----------|")
add_summary_line("## Script execution summary\n")
add_summary_line("| Name | Duration | Succeeded |")
add_summary_line("|------|----------|-----------|")
for script in scripts:
logging.info(f"start running {script}")
scripts = sorted([SRC_DIR / file for file in os.listdir(SRC_DIR) if file.endswith('.py')])
failure = False
for script in scripts:
logging.info(f"start running {script}")
start = time.perf_counter()
child = subprocess.run([sys.executable, script]) # timeout handled in subscripts
elapsed_seconds = time.perf_counter() - start
start = time.perf_counter()
child = subprocess.run([sys.executable, script]) # timeout handled in subscripts
elapsed_seconds = time.perf_counter() - start
if child.returncode != 0:
some_script_failed = True
add_summary_line(f"| {script} | {elapsed_seconds:.2f}s | ❌ |")
logging.error(f"Error while running {script} after {elapsed_seconds:.2f}s, update will only be partial")
else:
logging.info(f"Finished running {script}, took {elapsed_seconds:.2f}s")
add_summary_line(f"| {script} | {elapsed_seconds:.2f}s | ✅ |")
if child.returncode != 0:
failure = True
summary.println(f"| {script} | {elapsed_seconds:.2f}s | ❌ |")
logging.error(f"Error while running {script} after {elapsed_seconds:.2f}s, update will only be partial")
else:
logging.info(f"Finished running {script}, took {elapsed_seconds:.2f}s")
summary.println(f"| {script} | {elapsed_seconds:.2f}s | ✅ |")
# Generate commit message
subprocess.run('git add --all', timeout=10, check=True, shell=True) # to also get new files in git diff
git_diff = subprocess.run('git diff --name-only --staged', capture_output=True, timeout=10, check=True, shell=True)
updated_files = [Path(file) for file in git_diff.stdout.decode('utf-8').split('\n')]
updated_product_files = sorted([file for file in updated_files if file.parent == DATA_DIR])
logging.info(f"Updated product files: {[file.name for file in updated_product_files]}")
summary.println("")
return failure
add_summary_line("## Update summary\n")
if updated_product_files:
# get modified files content
new_files_content = {}
for path in updated_product_files:
with path.open() as file:
new_files_content[path] = json.load(file)
# get original files content
old_files_content = {}
subprocess.run('git stash --all --quiet', timeout=10, check=True, shell=True)
def get_updated_products() -> list[Path]:
subprocess.run('git add --all', timeout=10, check=True, shell=True) # to also get new files in git diff
git_diff = subprocess.run('git diff --name-only --staged', capture_output=True, timeout=10, check=True, shell=True)
updated_files = [Path(file) for file in git_diff.stdout.decode('utf-8').split('\n')]
return sorted([file for file in updated_files if file.parent == DATA_DIR])
def load_products_json(updated_product_files: list[Path]) -> dict[Path, dict]:
files_content = {}
for path in updated_product_files:
if path.exists():
with path.open() as file:
old_files_content[path] = json.load(file)
else: # new file
old_files_content[path] = {}
subprocess.run('git stash pop --quiet', timeout=10, check=True, shell=True)
files_content[path] = json.load(file)
else: # new or deleted file
files_content[path] = {}
# Generate commit message
product_names = ', '.join([path.stem for path in updated_product_files])
commit_message = f"🤖: {product_names}\n\n"
add_summary_line(f"Updated {len(updated_product_files)} products: {product_names}.")
return files_content
for path in updated_product_files:
add_summary_line(f"### {path.stem}\n")
commit_message += f"{path.stem}:\n"
diff = DeepDiff(old_files_content[path], new_files_content[path], ignore_order=True)
for line in diff.pretty().split('\n'):
add_summary_line(f"- {line}")
commit_message += f"- {line}\n"
logging.info(f"{path.stem}: {line}")
def generate_commit_message(old_content: dict[Path, dict], new_content: dict[Path, dict], summary: GitHubStepSummary) -> None:
product_names = ', '.join([path.stem for path in old_content])
summary.println(f"Updated {len(old_content)} products: {product_names}.\n")
commit_message += "\n"
add_summary_line("")
commit_message = GitHubOutput('commit_message')
with commit_message:
commit_message.println(f"🤖: {product_names}\n")
github_output('commit_message', commit_message)
for path in old_content:
product_name = path.stem
summary.println(f"### {product_name}\n")
commit_message.println(f"{product_name}:")
else:
add_summary_line("No update")
diff = DeepDiff(old_content[path], new_content[path], ignore_order=True)
for line in diff.pretty().split('\n'):
summary.println(f"- {line}")
commit_message.println(f"- {line}")
logging.info(f"{product_name}: {line}")
commit_message.println("")
summary.println("")
logging.basicConfig(format=logging.BASIC_FORMAT, level=logging.INFO)
step_summary = GitHubStepSummary()
with step_summary:
some_script_failed = run_scripts(step_summary)
updated_products = get_updated_products()
step_summary.println("## Update summary\n")
if updated_products:
new_files_content = load_products_json(updated_products)
subprocess.run('git stash --all --quiet', timeout=10, check=True, shell=True)
old_files_content = load_products_json(updated_products)
subprocess.run('git stash pop --quiet', timeout=10, check=True, shell=True)
generate_commit_message(old_files_content, new_files_content, step_summary)
else:
step_summary.println("No update")
sys.exit(1 if some_script_failed else 0)