[util] use functions for predicates
more lightweight and faster than classes
This commit is contained in:
@@ -957,115 +957,110 @@ def build_proxy_map(proxies, log=None):
|
||||
return proxies
|
||||
|
||||
|
||||
def build_predicate(predicates):
|
||||
def predicate_build(predicates):
|
||||
if not predicates:
|
||||
return true
|
||||
elif len(predicates) == 1:
|
||||
|
||||
if len(predicates) == 1:
|
||||
return predicates[0]
|
||||
return functools.partial(chain_predicates, predicates)
|
||||
|
||||
def chain(url, kwdict):
|
||||
for pred in predicates:
|
||||
if not pred(url, kwdict):
|
||||
return False
|
||||
return True
|
||||
return chain
|
||||
|
||||
|
||||
def chain_predicates(predicates, url, kwdict):
|
||||
for pred in predicates:
|
||||
if not pred(url, kwdict):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class RangePredicate():
|
||||
"""Predicate; True if the current index is in the given range(s)"""
|
||||
|
||||
def __init__(self, rangespec):
|
||||
self.ranges = ranges = self._parse(rangespec)
|
||||
self.index = 0
|
||||
|
||||
if ranges:
|
||||
# technically wrong, but good enough for now
|
||||
# and evaluating min/max for a large range is slow
|
||||
self.lower = min(r.start for r in ranges)
|
||||
self.upper = max(r.stop for r in ranges) - 1
|
||||
else:
|
||||
self.lower = 0
|
||||
self.upper = 0
|
||||
|
||||
def __call__(self, _url, _kwdict):
|
||||
self.index = index = self.index + 1
|
||||
|
||||
if index > self.upper:
|
||||
raise exception.StopExtraction()
|
||||
|
||||
for range in self.ranges:
|
||||
if index in range:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _parse(self, rangespec):
|
||||
"""Parse an integer range string and return the resulting ranges
|
||||
|
||||
Examples:
|
||||
_parse("-2,4,6-8,10-") -> [(1,3), (4,5), (6,9), (10,INTMAX)]
|
||||
_parse(" - 3 , 4- 4, 2-6") -> [(1,4), (4,5), (2,7)]
|
||||
_parse("1:2,4:8:2") -> [(1,1), (4,7,2)]
|
||||
"""
|
||||
ranges = []
|
||||
|
||||
if isinstance(rangespec, str):
|
||||
rangespec = rangespec.split(",")
|
||||
elif isinstance(rangespec, int):
|
||||
rangespec = (str(rangespec),)
|
||||
|
||||
for group in rangespec:
|
||||
if not group:
|
||||
continue
|
||||
|
||||
elif ":" in group:
|
||||
start, _, stop = group.partition(":")
|
||||
stop, _, step = stop.partition(":")
|
||||
ranges.append(range(
|
||||
int(start) if start.strip() else 1,
|
||||
int(stop) if stop.strip() else sys.maxsize,
|
||||
int(step) if step.strip() else 1,
|
||||
))
|
||||
|
||||
elif "-" in group:
|
||||
start, _, stop = group.partition("-")
|
||||
ranges.append(range(
|
||||
int(start) if start.strip() else 1,
|
||||
int(stop) + 1 if stop.strip() else sys.maxsize,
|
||||
))
|
||||
|
||||
else:
|
||||
start = int(group)
|
||||
ranges.append(range(start, start+1))
|
||||
|
||||
return ranges
|
||||
|
||||
|
||||
class UniquePredicate():
|
||||
def predicate_unique():
|
||||
"""Predicate; True if given URL has not been encountered before"""
|
||||
def __init__(self):
|
||||
self.urls = set()
|
||||
|
||||
def __call__(self, url, _):
|
||||
def _pred(url, _):
|
||||
if url.startswith("text:"):
|
||||
return True
|
||||
if url not in self.urls:
|
||||
self.urls.add(url)
|
||||
if url not in urls:
|
||||
urls.add(url)
|
||||
return True
|
||||
return False
|
||||
urls = set()
|
||||
return _pred
|
||||
|
||||
|
||||
class FilterPredicate():
|
||||
def predicate_filter(expr, target="image"):
|
||||
"""Predicate; True if evaluating the given expression returns True"""
|
||||
|
||||
def __init__(self, expr, target="image"):
|
||||
name = f"<{target} filter>"
|
||||
self.expr = compile_filter(expr, name)
|
||||
|
||||
def __call__(self, _, kwdict):
|
||||
def _pred(_, kwdict):
|
||||
try:
|
||||
return self.expr(kwdict)
|
||||
return expr(kwdict)
|
||||
except exception.GalleryDLException:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise exception.FilterError(exc)
|
||||
expr = compile_filter(expr, f"<{target} filter>")
|
||||
return _pred
|
||||
|
||||
|
||||
def predicate_range(ranges, skip=None):
|
||||
"""Predicate; True if the current index is in the given range(s)"""
|
||||
if ranges := predicate_range_parse(ranges):
|
||||
# technically wrong for 'step > 2', but good enough for now
|
||||
# and evaluating min/max for a large range is slow
|
||||
upper = max(r.stop for r in ranges) - 1
|
||||
lower = min(r.start for r in ranges)
|
||||
index = 0 if skip is None or lower <= 1 else skip(lower)
|
||||
del lower
|
||||
else:
|
||||
index = upper = 0
|
||||
|
||||
def _pred(_url, _kwdict):
|
||||
nonlocal index
|
||||
|
||||
if index >= upper:
|
||||
raise exception.StopExtraction()
|
||||
index += 1
|
||||
|
||||
for range in ranges:
|
||||
if index in range:
|
||||
return True
|
||||
return False
|
||||
return _pred
|
||||
|
||||
|
||||
def predicate_range_parse(rangespec):
|
||||
"""Parse an integer range string and return the resulting ranges
|
||||
|
||||
Examples:
|
||||
_parse("-2,4,6-8,10-") -> [(1,3), (4,5), (6,9), (10,INTMAX)]
|
||||
_parse(" - 3 , 4- 4, 2-6") -> [(1,4), (4,5), (2,7)]
|
||||
_parse("1:2,4:8:2") -> [(1,1), (4,7,2)]
|
||||
"""
|
||||
ranges = []
|
||||
|
||||
if isinstance(rangespec, str):
|
||||
rangespec = rangespec.split(",")
|
||||
elif isinstance(rangespec, int):
|
||||
rangespec = (str(rangespec),)
|
||||
|
||||
for group in rangespec:
|
||||
if not group:
|
||||
continue
|
||||
|
||||
elif ":" in group:
|
||||
start, _, stop = group.partition(":")
|
||||
stop, _, step = stop.partition(":")
|
||||
ranges.append(range(
|
||||
int(start) if start.strip() else 1,
|
||||
int(stop) if stop.strip() else sys.maxsize,
|
||||
int(step) if step.strip() else 1,
|
||||
))
|
||||
|
||||
elif "-" in group:
|
||||
start, _, stop = group.partition("-")
|
||||
ranges.append(range(
|
||||
int(start) if start.strip() else 1,
|
||||
int(stop) + 1 if stop.strip() else sys.maxsize,
|
||||
))
|
||||
|
||||
else:
|
||||
start = int(group)
|
||||
ranges.append(range(start, start+1))
|
||||
|
||||
return ranges
|
||||
|
||||
Reference in New Issue
Block a user