Support release-level data (#297)

Support retrieving and updating generic release-level data, such as support and eol dates. The JSON format has been changed accordingly to add a new top-level `releases` key.

The `aws-lambda.py` script has been updated to make use of this new feature.
This commit is contained in:
Marc Wrobel
2024-02-11 16:57:59 +01:00
committed by GitHub
parent a0ba2d687e
commit b6f14c8d61
231 changed files with 500 additions and 152 deletions

View File

@@ -1,16 +1,7 @@
from bs4 import BeautifulSoup
from common import dates, endoflife, http, releasedata
"""Fetches AWS lambda runtimes from https://docs.aws.amazon.com.
This script does not retrieve release dates, as they are only available in release announcements.
Instead, it uses the release dates from the endoflife.date product file, or alternatively the
date the release was first detected (or the current date if none is found).
If one day release dates are available in the AWS documentation, it would be better to make use
them though. Note that this would also be unnecessary if it was possible to disable release/latest
release dates updates in the latest.py script."""
"""Fetches AWS lambda runtimes with their support / EOL dates from https://docs.aws.amazon.com."""
with releasedata.ProductData("aws-lambda") as product_data:
product_frontmatter = endoflife.ProductFrontmatter(product_data.name)
@@ -18,17 +9,31 @@ with releasedata.ProductData("aws-lambda") as product_data:
soup = BeautifulSoup(response.text, features="html5lib")
for table in soup.find_all("table"):
headers = [th.get_text().strip().lower() for th in table.find("thead").find_all("tr")[1].find_all("th")]
if "identifier" not in headers:
continue
table_name = table.find("thead").find_all("tr")[0].find("th").get_text().strip().lower()
if table_name != "supported runtimes" and table_name != "deprecated runtimes":
message = f"unexpected table '{table_name}'"
raise ValueError(message)
headers = [th.get_text().strip().lower() for th in table.find("thead").find_all("tr")[1].find_all("th")]
if "identifier" not in headers or "deprecation date" not in headers or "block function update" not in headers:
message = f"table '{table_name}' does not contain the expected headers"
raise ValueError(message)
is_supported_table = table_name == "supported runtimes"
identifier_index = headers.index("identifier")
deprecation_date_index = headers.index("deprecation date")
block_function_update_index = headers.index("block function update")
for row in table.find("tbody").find_all("tr"):
cells = row.find_all("td")
identifier = cells[identifier_index].get_text().strip()
deprecation_date_str = cells[deprecation_date_index].get_text().strip()
deprecation_date = dates.parse_date(deprecation_date_str) if deprecation_date_str else None
block_function_update_str = cells[block_function_update_index].get_text().strip()
block_function_update = dates.parse_date(block_function_update_str) if block_function_update_str else None
date = product_frontmatter.get_release_date(identifier) # use the product releaseDate if available
if date is None:
date = dates.today() # else use today's date
product_data.declare_version(identifier, date)
release = product_data.get_release(identifier)
# if no date is available, use True for supported runtimes and False for deprecated ones
release.set_support(deprecation_date if deprecation_date else is_supported_table)
# if no date is available, use False for supported runtimes and True for deprecated ones
release.set_eol(block_function_update if block_function_update else not is_supported_table)

View File

@@ -86,8 +86,14 @@ class ProductFrontmatter:
return configs
def get_releases(self) -> list[dict]:
return self.data.get("releases", [])
def get_release_names(self) -> list[str]:
return [release["releaseCycle"] for release in self.get_releases()]
def get_release_date(self, release_cycle: str) -> datetime | None:
for release in self.data["releases"]:
for release in self.get_releases():
if release["releaseCycle"] == release_cycle:
return release["releaseDate"]
return None

View File

@@ -16,6 +16,38 @@ class ProductUpdateError(Exception):
"""Custom exceptions raised when unexpected errors occur during product updates."""
class ProductRelease:
def __init__(self, product: str, data: dict = None) -> None:
self.product = product
self.data = data if data else {}
@staticmethod
def of(product: str, name: str) -> "ProductRelease":
return ProductRelease(product, { "name": name })
def name(self) -> str:
return self.data["name"]
def set_support(self, new_value: datetime | bool) -> None:
self.set_field("support", new_value)
def set_eol(self, new_value: datetime | bool) -> None:
self.set_field("eol", new_value)
def set_field(self, field: str, new_value: any) -> None:
new_value = new_value.strftime("%Y-%m-%d") if isinstance(new_value, datetime) else new_value
old_value = self.data.get(field, None)
if old_value != new_value:
self.data[field] = new_value
if old_value:
logging.info(f"updated '{field}' in {self} from {old_value} to {new_value}")
else:
logging.info(f"set '{field}' in {self} to {new_value}")
def __repr__(self) -> str:
return f"{self.product}#{self.name()}"
class ProductVersion:
def __init__(self, product: str, data: dict) -> None:
self.product = product
@@ -45,6 +77,7 @@ class ProductData:
def __init__(self, name: str) -> None:
self.name: str = name
self.versions_path: Path = VERSIONS_PATH / f"{name}.json"
self.releases = {}
self.versions: dict[str, ProductVersion] = {}
def __enter__(self) -> "ProductData":
@@ -67,13 +100,21 @@ class ProductData:
raise ProductUpdateError(message) from exc_value
logging.info("updating %s data",self.versions_path)
# sort by date then version (desc)
ordered_releases = sorted(self.releases.values(), key=lambda v: v.name(), reverse=True)
ordered_versions = sorted(self.versions.values(), key=lambda v: (v.date(), v.name()), reverse=True)
with self.versions_path.open("w") as f:
f.write(json.dumps({
"releases": {release.name(): release.data for release in ordered_releases},
"versions": {version.name(): version.data for version in ordered_versions},
}, indent=2))
def get_release(self, release: str) -> ProductRelease:
if release not in self.releases:
logging.info(f"adding release {release} to {self}")
self.releases[release] = ProductRelease.of(self.name, release)
return self.releases[release]
def get_version(self, version: str) -> ProductVersion:
return self.versions[version] if version in self.versions else None