Remove invalid releases automatically (#466)
Automatically remove empty releases or releases which are released in the future, regardless of the product. Also refactored a bit releasedata.py to use better names and clarify types.
This commit is contained in:
@@ -74,7 +74,7 @@ class ProductRelease:
|
||||
def is_empty(self) -> bool:
|
||||
return len(self.data) == 1 # only the name is set
|
||||
|
||||
def is_released_after(self, date: datetime) -> bool:
|
||||
def was_released_after(self, date: datetime) -> bool:
|
||||
release_date = self.get_release_date()
|
||||
return release_date and release_date > date
|
||||
|
||||
@@ -110,14 +110,14 @@ class ProductVersion:
|
||||
class ProductData:
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name: str = name
|
||||
self.versions_path: Path = DATA_DIR / f"{name}.json"
|
||||
self.releases = {}
|
||||
self.path: Path = DATA_DIR / f"{name}.json"
|
||||
self.releases: dict[str, ProductRelease] = {}
|
||||
self.versions: dict[str, ProductVersion] = {}
|
||||
self.updated = False
|
||||
self.updated: bool = False
|
||||
|
||||
def __enter__(self) -> "ProductData":
|
||||
if self.versions_path.is_file():
|
||||
with self.versions_path.open() as f:
|
||||
if self.path.is_file():
|
||||
with self.path.open() as f:
|
||||
json_data = json.load(f)
|
||||
for json_version in json_data.get("versions", {}).values():
|
||||
version = ProductVersion(self.name, json_version)
|
||||
@@ -125,9 +125,9 @@ class ProductData:
|
||||
for json_release in json_data.get("releases", {}).values():
|
||||
release = ProductRelease(self.name, json_release)
|
||||
self.releases[release.name()] = release
|
||||
logging.info(f"loaded data for {self} from {self.versions_path}")
|
||||
logging.info(f"loaded data for {self} from {self.path}")
|
||||
else:
|
||||
logging.info(f"no data found for {self} at {self.versions_path}")
|
||||
logging.info(f"no data found for {self} at {self.path}")
|
||||
|
||||
return self
|
||||
|
||||
@@ -143,48 +143,49 @@ class ProductData:
|
||||
logging.error(message)
|
||||
raise ProductUpdateError(message)
|
||||
|
||||
logging.info("updating %s data",self.versions_path)
|
||||
logging.info("updating %s data", self.path)
|
||||
ordered_releases = sorted(self.releases.values(), key=lambda v: v.name(), reverse=True)
|
||||
ordered_versions = sorted(self.versions.values(), key=lambda v: (v.date(), v.name()), reverse=True)
|
||||
with self.versions_path.open("w") as f:
|
||||
with self.path.open("w") as f:
|
||||
f.write(json.dumps({
|
||||
"releases": {release.name(): release.data for release in ordered_releases},
|
||||
"versions": {version.name(): version.data for version in ordered_versions},
|
||||
}, indent=2))
|
||||
|
||||
def get_release(self, release: str) -> ProductRelease:
|
||||
if release not in self.releases:
|
||||
logging.info(f"adding release {release} to {self}")
|
||||
self.releases[release] = ProductRelease.of(self.name, release)
|
||||
def get_release(self, release_name: str) -> ProductRelease:
|
||||
if release_name not in self.releases:
|
||||
logging.info(f"adding release {release_name} to {self}")
|
||||
self.releases[release_name] = ProductRelease.of(self.name, release_name)
|
||||
|
||||
self.updated = True
|
||||
return self.releases[release]
|
||||
return self.releases[release_name]
|
||||
|
||||
def remove_release(self, release: str) -> None:
|
||||
if release not in self.releases:
|
||||
logging.warning(f"release {release} cannot be removed as it does not exist for {self}")
|
||||
def remove_release(self, release_name: str) -> None:
|
||||
if release_name not in self.releases:
|
||||
logging.warning(f"release {release_name} cannot be removed as it does not exist for {self}")
|
||||
return
|
||||
|
||||
logging.info(f"removing release {release} ({self.releases.pop(release)}) from {self}")
|
||||
|
||||
def get_version(self, version: str) -> ProductVersion:
|
||||
return self.versions[version] if version in self.versions else None
|
||||
|
||||
def declare_version(self, version: str, date: datetime) -> None:
|
||||
logging.info(f"removing release {release_name} ({self.releases.pop(release_name)}) from {self}")
|
||||
self.updated = True
|
||||
if version in self.versions and self.versions[version].date() != date:
|
||||
logging.info(f"overwriting {version} ({self.get_version(version).date()} -> {date}) for {self}")
|
||||
self.versions[version].replace_date(date)
|
||||
|
||||
def get_version(self, version_name: str) -> ProductVersion:
|
||||
return self.versions[version_name] if version_name in self.versions else None
|
||||
|
||||
def declare_version(self, version_name: str, versions_date: datetime) -> None:
|
||||
self.updated = True
|
||||
if version_name in self.versions and self.versions[version_name].date() != versions_date:
|
||||
logging.info(f"overwriting {version_name} ({self.get_version(version_name).date()} -> {versions_date}) for {self}")
|
||||
self.versions[version_name].replace_date(versions_date)
|
||||
else:
|
||||
logging.info(f"adding version {version} ({date}) to {self}")
|
||||
self.versions[version] = ProductVersion.of(self.name, version, date)
|
||||
logging.info(f"adding version {version_name} ({versions_date}) to {self}")
|
||||
self.versions[version_name] = ProductVersion.of(self.name, version_name, versions_date)
|
||||
|
||||
def remove_version(self, version: str) -> None:
|
||||
if version not in self.versions:
|
||||
logging.warning(f"version {version} cannot be removed as it does not exist for {self}")
|
||||
def remove_version(self, version_name: str) -> None:
|
||||
if version_name not in self.versions:
|
||||
logging.warning(f"version {version_name} cannot be removed as it does not exist for {self}")
|
||||
return
|
||||
|
||||
logging.info(f"removing version {version} ({self.versions.pop(version)}) from {self}")
|
||||
logging.info(f"removing version {version_name} ({self.versions.pop(version_name)}) from {self}")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.name
|
||||
|
||||
Reference in New Issue
Block a user