- Add 'disabled' field to AutoConfig class - Skip disabled scripts in update-release-data.py unless force flag is used
211 lines
9.4 KiB
Python
211 lines
9.4 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from deepdiff import DeepDiff
|
|
|
|
from src.common.endoflife import AutoConfig, ProductFrontmatter, list_products
|
|
from src.common.gha import GitHubGroup, GitHubOutput, GitHubStepSummary
|
|
from src.common.releasedata import DATA_DIR, SRC_DIR
|
|
|
|
|
|
class ScriptExecutionSummary:
|
|
def __init__(self) -> None:
|
|
self.success_by_product = {}
|
|
self.success_by_script = {}
|
|
self.durations_by_product = {}
|
|
self.durations_by_script = {}
|
|
self.scripts_by_product = {}
|
|
self.products_by_script = {}
|
|
|
|
def register(self, script: str, product: str, duration: float, success: bool) -> None:
|
|
self.success_by_product[product] = self.success_by_product.get(product, True) and success
|
|
self.success_by_script[script] = self.success_by_script.get(script, True) and success
|
|
self.durations_by_product[product] = self.durations_by_product.get(product, 0) + duration
|
|
self.durations_by_script[script] = self.durations_by_script.get(script, 0) + duration
|
|
self.scripts_by_product[product] = self.scripts_by_product.get(product, []) + [script]
|
|
self.products_by_script[script] = self.products_by_script.get(script, []) + [product]
|
|
|
|
def print_summary(self, summary: GitHubStepSummary, min_duration: float = 3) -> None:
|
|
summary.println("## Script execution summary\n")
|
|
summary.println(f"Executions below {min_duration} seconds are hidden except in case of failure.\n")
|
|
summary.println("### By products\n")
|
|
summary.println("| Name | Duration | Scripts | Succeeded |")
|
|
summary.println("|------|----------|---------|-----------|")
|
|
for product, duration in sorted(self.durations_by_product.items(), key=lambda x: x[1], reverse=True):
|
|
if duration >= min_duration or not self.success_by_product[product]:
|
|
scripts = ', '.join(self.scripts_by_product[product])
|
|
success = '✅' if self.success_by_product[product] else '❌'
|
|
summary.println(f"| {product} | {duration:.2f}s | {scripts} | {success} |")
|
|
|
|
summary.println("\n### By scripts\n")
|
|
summary.println("| Name | Duration | #Products | Succeeded |")
|
|
summary.println("|------|----------|-----------|-----------|")
|
|
for script, duration in sorted(self.durations_by_script.items(), key=lambda x: x[1], reverse=True):
|
|
if duration >= min_duration or not self.products_by_script[script]:
|
|
product_count = len(self.products_by_script[script])
|
|
success = '✅' if self.success_by_script[script] else '❌'
|
|
summary.println(f"| {script} | {duration:.2f}s | {product_count} | {success} |")
|
|
|
|
summary.println("")
|
|
|
|
def any_failure(self) -> bool:
|
|
return not all(self.success_by_product.values())
|
|
|
|
|
|
def install_playwright() -> None:
|
|
with GitHubGroup("Install Playwright"):
|
|
logging.info("Installing Playwright")
|
|
subprocess.run('playwright install chromium', timeout=120, check=True, shell=True)
|
|
logging.info("Playwright installed")
|
|
|
|
|
|
def __delete_data(product: ProductFrontmatter) -> None:
|
|
release_data_path = Path(__file__).resolve().parent / DATA_DIR / f"{product.name}.json"
|
|
if not release_data_path.exists() or product.is_auto_update_cumulative():
|
|
return
|
|
|
|
release_data_path.unlink()
|
|
logging.info(f"deleted {release_data_path} before running scripts")
|
|
|
|
|
|
def __revert_data(product: ProductFrontmatter) -> None:
|
|
release_data_path = Path(__file__).resolve().parent / DATA_DIR / f"{product.name}.json"
|
|
# check=False because the command fails if the file did not exist before
|
|
subprocess.run(f'git checkout HEAD -- {release_data_path}', timeout=10, check=False, shell=True)
|
|
logging.warning(f"reverted changes in {release_data_path}")
|
|
|
|
|
|
def __run_script(product: ProductFrontmatter, config: AutoConfig, summary: ScriptExecutionSummary) -> bool:
|
|
script = Path(__file__).resolve().parent / SRC_DIR / config.script
|
|
|
|
logging.info(f"start running {script} for {config}")
|
|
start = time.perf_counter()
|
|
|
|
# timeout is handled in child scripts
|
|
script_args = [sys.executable, script, "-p", product.path, "-m", str(config.method), "-u", str(config.url)]
|
|
script_args = script_args + ["-v"] if logging.getLogger().isEnabledFor(logging.DEBUG) else script_args
|
|
child = subprocess.run(script_args)
|
|
|
|
success = child.returncode == 0
|
|
elapsed_seconds = time.perf_counter() - start
|
|
|
|
summary.register(script.stem, product.name, elapsed_seconds, success)
|
|
logging.log(logging.ERROR if not success else logging.INFO,
|
|
f"ran {script} for {config}, took {elapsed_seconds:.2f}s (success={success})")
|
|
|
|
return success
|
|
|
|
|
|
def run_scripts(summary: GitHubStepSummary, products: list[ProductFrontmatter], force: bool = False) -> bool:
|
|
exec_summary = ScriptExecutionSummary()
|
|
|
|
for product in products:
|
|
configs = product.auto_configs()
|
|
if not configs:
|
|
continue # skip products without auto configs
|
|
|
|
if product.is_auto_update_disabled() and not force:
|
|
logging.info(f"skipping {product.name} as auto update is disabled")
|
|
continue
|
|
|
|
# Add default configs
|
|
configs = [AutoConfig(product.name, {"_copy_product_releases": ""})] + configs
|
|
configs = configs + [AutoConfig(product.name, {"_remove_invalid_releases": ""})]
|
|
|
|
with GitHubGroup(product.name):
|
|
try:
|
|
__delete_data(product)
|
|
for config in configs:
|
|
if config.is_disabled() and not force:
|
|
logging.info(f"skipping script {config.script} for {product.name} as it is disabled")
|
|
continue
|
|
|
|
success = __run_script(product, config, exec_summary)
|
|
if not success:
|
|
__revert_data(product)
|
|
break # stop running scripts for this product
|
|
|
|
except BaseException:
|
|
logging.exception(f"Skipping {product.name}, there was an error while running its scripts")
|
|
|
|
exec_summary.print_summary(summary)
|
|
return exec_summary.any_failure()
|
|
|
|
|
|
def get_updated_products() -> list[Path]:
|
|
subprocess.run('git add --all', timeout=10, check=True, shell=True) # to also get new files in git diff
|
|
git_diff = subprocess.run('git diff --name-only --staged', capture_output=True, timeout=10, check=True, shell=True)
|
|
updated_files = [Path(file) for file in git_diff.stdout.decode('utf-8').split('\n')]
|
|
return sorted([file for file in updated_files if file.parent == DATA_DIR])
|
|
|
|
|
|
def load_products_json(updated_product_files: list[Path]) -> dict[Path, dict]:
|
|
files_content = {}
|
|
|
|
for path in updated_product_files:
|
|
if path.exists():
|
|
with path.open() as file:
|
|
files_content[path] = json.load(file)
|
|
else: # new or deleted file
|
|
files_content[path] = {}
|
|
|
|
return files_content
|
|
|
|
|
|
def generate_commit_message(old_content: dict[Path, dict], new_content: dict[Path, dict], summary: GitHubStepSummary) -> None:
|
|
product_names = ', '.join([path.stem for path in old_content])
|
|
summary.println(f"Updated {len(old_content)} products: {product_names}.\n")
|
|
|
|
commit_message = GitHubOutput('commit_message')
|
|
with commit_message:
|
|
commit_message.println(f"🤖: {product_names}\n")
|
|
|
|
for path in old_content:
|
|
product_name = path.stem
|
|
summary.println(f"### {product_name}\n")
|
|
commit_message.println(f"{product_name}:")
|
|
|
|
diff = DeepDiff(old_content[path], new_content[path], ignore_order=True, verbose_level=2)
|
|
for line in diff.pretty().split('\n'):
|
|
summary.println(f"- {line}")
|
|
commit_message.println(f"- {line}")
|
|
logging.info(f"{product_name}: {line}")
|
|
|
|
commit_message.println("")
|
|
summary.println("")
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description='Update product releases.')
|
|
parser.add_argument('product', nargs='?', help='restrict update to the given product')
|
|
parser.add_argument('-p', '--product-dir', required=True, help='path to the product directory')
|
|
parser.add_argument('-v', '--verbose', action='store_true', help='enable verbose logging')
|
|
parser.add_argument('-f', '--force', action='store_true', help='force update even if auto update is disabled')
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(format=logging.BASIC_FORMAT, level=(logging.DEBUG if args.verbose else logging.INFO))
|
|
install_playwright()
|
|
|
|
products_dir = Path(args.product_dir)
|
|
products_list = list_products(products_dir, args.product)
|
|
|
|
with GitHubStepSummary() as step_summary:
|
|
some_script_failed = run_scripts(step_summary, products_list, force=args.force)
|
|
updated_products = get_updated_products()
|
|
|
|
step_summary.println("## Update summary\n")
|
|
if updated_products:
|
|
new_files_content = load_products_json(updated_products)
|
|
subprocess.run('git stash --all --quiet', timeout=10, check=True, shell=True)
|
|
old_files_content = load_products_json(updated_products)
|
|
subprocess.run('git stash pop --quiet', timeout=10, check=True, shell=True)
|
|
generate_commit_message(old_files_content, new_files_content, step_summary)
|
|
else:
|
|
step_summary.println("No update")
|
|
|
|
sys.exit(1 if some_script_failed else 0)
|