Improve scripts execution orchestration (#299)

Until now products could declare multiple auto-update methods, but they all had to be of the same kind.
For example if you used the git auto-update method, you could not use an additional github_releases or custom auto-update method.
This is an issue as it prevents us to extend the auto-update process, for example by having a product using the 'git' auto-update method to retrieve all the versions, and a custom script to retrieve support and EOL dates.

This improve the scripts execution orchestration to be able to support auto configurations using a mix of methods, meaning:

- multiple kind of methods, such as git and github_release,
- or multiple custom methods.

A side-effect of those changes is that now a failure in a generic script does not cancel the update of subsequent products.

Another side-effect, unwanted this time, is that now custom scripts managing multiple products, such as apple.py, are now executed multiple times instead of once.
This commit is contained in:
Marc Wrobel
2024-02-11 15:28:26 +01:00
committed by GitHub
parent 2b6e21786e
commit a0ba2d687e
17 changed files with 254 additions and 193 deletions

View File

@@ -1,3 +1,4 @@
import itertools
import logging
import os
import re
@@ -17,11 +18,14 @@ PRODUCTS_PATH = Path(os.environ.get("PRODUCTS_PATH", "website/products"))
class AutoConfig:
def __init__(self, method: str, config: dict) -> None:
self.method = method
self.url = config[method]
def __init__(self, product: str, config: dict) -> None:
self.product = product
self.method = next(key for key in config if key not in ("template", "regex", "regex_exclude"))
self.url = config[self.method]
self.version_template = Template(config.get("template", DEFAULT_VERSION_TEMPLATE))
self.script = f"{self.url}.py" if self.method == "custom" else f"{self.method}.py"
regexes_include = config.get("regex", DEFAULT_VERSION_REGEX)
regexes_include = regexes_include if isinstance(regexes_include, list) else [regexes_include]
self.include_version_patterns = [re.compile(r) for r in regexes_include]
@@ -45,6 +49,9 @@ class AutoConfig:
def render(self, match: re.Match) -> str:
return self.version_template.render(**match.groupdict())
def __repr__(self) -> str:
return f"{self.product}#{self.method}({self.url})"
class ProductFrontmatter:
def __init__(self, name: str) -> None:
@@ -59,17 +66,23 @@ class ProductFrontmatter:
else:
logging.warning(f"no product data found for {self.name} at {self.path}")
def get_auto_configs(self, method: str) -> list[AutoConfig]:
def has_auto_configs(self) -> bool:
return self.data and "methods" in self.data.get("auto", {})
def is_auto_update_cumulative(self) -> bool:
return self.data.get("auto", {}).get("cumulative", False)
def auto_configs(self, method_filter: str = None, url_filter: str = None) -> list[AutoConfig]:
configs = []
all_configs = self.data.get("auto", {}).get("methods", [])
for config in all_configs:
if method in config:
configs.append(AutoConfig(method, config))
configs_data = self.data.get("auto", {}).get("methods", [])
for config_data in configs_data:
config = AutoConfig(self.name, config_data)
if ((method_filter and config.method != method_filter)
or (url_filter and config.url != url_filter)):
continue
if len(configs) > 0 and len(configs) != len(all_configs):
message = f"mixed auto-update methods declared for {self.name}, this is not yet supported"
raise ValueError(message)
configs.append(config)
return configs
@@ -80,7 +93,7 @@ class ProductFrontmatter:
return None
def list_products(method: str, products_filter: str = None) -> list[ProductFrontmatter]:
def list_products(products_filter: str = None) -> list[ProductFrontmatter]:
"""Return a list of products that are using the same given update method."""
products = []
@@ -89,9 +102,12 @@ def list_products(method: str, products_filter: str = None) -> list[ProductFront
if products_filter and product_name != products_filter:
continue
product = ProductFrontmatter(product_name)
configs = product.get_auto_configs(method)
if len(configs) > 0:
products.append(product)
products.append(ProductFrontmatter(product_name))
return products
def list_configs(products_filter: str = None, methods_filter: str = None, urls_filter: str = None) -> list[AutoConfig]:
products = list_products(products_filter)
configs_by_product = [p.auto_configs(methods_filter, urls_filter) for p in products]
return list(itertools.chain.from_iterable(configs_by_product)) # flatten the list of lists

View File

@@ -45,3 +45,14 @@ class GitHubStepSummary:
if var_exists:
with open(os.environ["GITHUB_STEP_SUMMARY"], 'a') as github_step_summary: # NOQA: PTH123
print(self.value, file=github_step_summary)
class GitHubGroup:
def __init__(self, name: str) -> None:
self.name = name
def __enter__(self) -> None:
logging.info(f"::group::{self.name}")
def __exit__(self, exc_type: any, exc_value: any, traceback: any) -> None:
logging.info("::endgroup::")

View File

@@ -37,71 +37,53 @@ class ProductVersion:
def replace_date(self, date: datetime) -> None:
self.data["date"] = date.strftime("%Y-%m-%d")
def copy(self) -> "ProductVersion":
return ProductVersion(self.product, self.data.copy())
def __repr__(self) -> str:
return f"{self.product}#{self.name()} ({self.date()})"
class ProductData:
def __init__(self, name: str, cumulative_update: bool = False) -> None:
def __init__(self, name: str) -> None:
self.name: str = name
self.cumulative_update: bool = cumulative_update
self.versions_path: Path = VERSIONS_PATH / f"{name}.json"
self.versions: dict[str, ProductVersion] = {}
self.previous_versions: dict[str, ProductVersion] = {}
def __enter__(self) -> "ProductData":
logging.info(f"::group::{self}")
if self.versions_path.is_file():
with self.versions_path.open() as f:
for json_version in json.load(f)["versions"].values():
version = ProductVersion(self.name, json_version)
self.previous_versions[version.name()] = version
logging.info(f"loaded previous versions data for {self} from {self.versions_path}")
self.versions[version.name()] = version
logging.info(f"loaded versions data for {self} from {self.versions_path}")
else:
logging.info(f"no previous versions data found for {self} at {self.versions_path}")
if self.cumulative_update:
logging.info(f"cumulative update is enabled for {self}, will reuse previous versions data")
for name, version in self.previous_versions.items():
self.versions[name] = version.copy()
logging.info(f"no versions data found for {self} at {self.versions_path}")
return self
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType]) -> None:
try:
if exc_value:
message = f"an unexpected error occurred while updating {self} data"
logging.error(message, exc_info=exc_value)
raise ProductUpdateError(message) from exc_value
if exc_value:
message = f"an unexpected error occurred while updating {self} data"
logging.error(message, exc_info=exc_value)
raise ProductUpdateError(message) from exc_value
logging.info("updating %s data",self)
# sort by date then version (desc)
ordered_versions = sorted(self.versions.values(), key=lambda v: (v.date(), v.name()), reverse=True)
with self.versions_path.open("w") as f:
f.write(json.dumps({
"versions": {version.name(): version.data for version in ordered_versions},
}, indent=2))
finally:
logging.info("::endgroup::")
logging.info("updating %s data",self.versions_path)
# sort by date then version (desc)
ordered_versions = sorted(self.versions.values(), key=lambda v: (v.date(), v.name()), reverse=True)
with self.versions_path.open("w") as f:
f.write(json.dumps({
"versions": {version.name(): version.data for version in ordered_versions},
}, indent=2))
def get_version(self, version: str) -> ProductVersion:
return self.versions[version] if version in self.versions else None
def get_previous_version(self, version: str) -> ProductVersion:
return self.previous_versions[version] if version in self.previous_versions else None
def declare_version(self, version: str, date: datetime) -> None:
if version in self.versions and self.versions[version].date() != date:
logging.info(f"overwriting {version} ({self.get_version(version).date()} -> {date}) for {self}")
self.versions[version].replace_date(date)
else:
logging.info(f"adding version {version} ({date}) to {self}")
self.versions[version] = ProductVersion.of(self, version, date)
self.versions[version] = ProductVersion.of(self.name, version, date)
def declare_versions(self, dates_by_version: dict[str, datetime]) -> None:
for (version, date) in dates_by_version.items():