[util] update code to 3.8

This commit is contained in:
Mike Fährmann
2025-06-17 15:10:20 +02:00
parent 6d928f3805
commit c88d69376f

View File

@@ -61,8 +61,7 @@ def bdecode(data, alphabet="0123456789"):
num = 0 num = 0
base = len(alphabet) base = len(alphabet)
for c in data: for c in data:
num *= base num = num * base + alphabet.find(c)
num += alphabet.index(c)
return num return num
@@ -172,18 +171,17 @@ def sha1(s):
def generate_token(size=16): def generate_token(size=16):
"""Generate a random token with hexadecimal digits""" """Generate a random token with hexadecimal digits"""
data = random.getrandbits(size * 8).to_bytes(size, "big") return random.getrandbits(size * 8).to_bytes(size, "big").hex()
return binascii.hexlify(data).decode()
def format_value(value, suffixes="kMGTPEZY"): def format_value(value, suffixes="kMGTPEZY"):
value = format(value) value = str(value)
value_len = len(value) value_len = len(value)
index = value_len - 4 index = value_len - 4
if index >= 0: if index >= 0:
offset = (value_len - 1) % 3 + 1 offset = (value_len - 1) % 3 + 1
return (value[:offset] + "." + value[offset:offset+2] + return (f"{value[:offset]}.{value[offset:offset+2]}"
suffixes[index // 3]) f"{suffixes[index // 3]}")
return value return value
@@ -339,30 +337,12 @@ def dump_response(response, fp, headers=False, content=True, hide_auth=True):
request = response.request request = response.request
req_headers = request.headers.copy() req_headers = request.headers.copy()
res_headers = response.headers.copy() res_headers = response.headers.copy()
outfmt = """\
{request.method} {request.url}
Status: {response.status_code} {response.reason}
Request Headers
---------------
{request_headers}
"""
if request.body:
outfmt += """
Request Body
------------
{request.body}
"""
outfmt += """
Response Headers
----------------
{response_headers}
"""
if hide_auth: if hide_auth:
authorization = req_headers.get("Authorization") authorization = req_headers.get("Authorization")
if authorization: if authorization:
atype, sep, _ = str(authorization).partition(" ") atype, sep, _ = str(authorization).partition(" ")
req_headers["Authorization"] = atype + " ***" if sep else "***" req_headers["Authorization"] = f"{atype} ***" if sep else "***"
cookie = req_headers.get("Cookie") cookie = req_headers.get("Cookie")
if cookie: if cookie:
@@ -376,20 +356,35 @@ Response Headers
res_headers["Set-Cookie"] = re(r"(^|, )([^ =]+)=[^,;]*").sub( res_headers["Set-Cookie"] = re(r"(^|, )([^ =]+)=[^,;]*").sub(
r"\1\2=***", set_cookie) r"\1\2=***", set_cookie)
fmt_nv = "{}: {}".format request_headers = "\n".join(
f"{name}: {value}"
for name, value in req_headers.items()
)
response_headers = "\n".join(
f"{name}: {value}"
for name, value in res_headers.items()
)
fp.write(outfmt.format( output = f"""\
request=request, {request.method} {request.url}
response=response, Status: {response.status_code} {response.reason}
request_headers="\n".join(
fmt_nv(name, value) Request Headers
for name, value in req_headers.items() ---------------
), {request_headers}
response_headers="\n".join( """
fmt_nv(name, value) if request.body:
for name, value in res_headers.items() output = f"""{output}
), Request Body
).encode()) ------------
{request.body}
"""
output = f"""{output}
Response Headers
----------------
{response_headers}
"""
fp.write(output.encode())
if content: if content:
if headers: if headers:
@@ -528,8 +523,7 @@ def cookiestxt_load(fp):
def cookiestxt_store(fp, cookies): def cookiestxt_store(fp, cookies):
"""Write 'cookies' in Netscape cookies.txt format to 'fp'""" """Write 'cookies' in Netscape cookies.txt format to 'fp'"""
write = fp.write fp.write("# Netscape HTTP Cookie File\n\n")
write("# Netscape HTTP Cookie File\n\n")
for cookie in cookies: for cookie in cookies:
if not cookie.domain: if not cookie.domain:
@@ -543,7 +537,7 @@ def cookiestxt_store(fp, cookies):
value = cookie.value value = cookie.value
domain = cookie.domain domain = cookie.domain
write("\t".join(( fp.write("\t".join((
domain, domain,
"TRUE" if domain and domain[0] == "." else "FALSE", "TRUE" if domain and domain[0] == "." else "FALSE",
cookie.path, cookie.path,
@@ -608,8 +602,7 @@ class HTTPBasicAuth():
def __init__(self, username, password): def __init__(self, username, password):
self.authorization = b"Basic " + binascii.b2a_base64( self.authorization = b"Basic " + binascii.b2a_base64(
username.encode("latin1") + b":" + str(password).encode("latin1") f"{username}:{password}".encode("latin1"), newline=False)
)[:-1]
def __call__(self, request): def __call__(self, request):
request.headers["Authorization"] = self.authorization request.headers["Authorization"] = self.authorization
@@ -773,11 +766,11 @@ EXTS_VIDEO = {"mp4", "m4v", "mov", "webm", "mkv", "ogv", "flv", "avi", "wmv"}
EXTS_ARCHIVE = {"zip", "rar", "7z", "tar", "gz", "bz2", "lzma", "xz"} EXTS_ARCHIVE = {"zip", "rar", "7z", "tar", "gz", "bz2", "lzma", "xz"}
USERAGENT = "gallery-dl/" + version.__version__ USERAGENT = "gallery-dl/" + version.__version__
USERAGENT_FIREFOX = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{}.0) " USERAGENT_FIREFOX = (f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; "
"Gecko/20100101 Firefox/{}.0").format(_ff_ver, _ff_ver) f"rv:{_ff_ver}.0) Gecko/20100101 Firefox/{_ff_ver}.0")
USERAGENT_CHROME = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) " USERAGENT_CHROME = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{}.0.0.0 " "AppleWebKit/537.36 (KHTML, like Gecko) "
"Safari/537.36").format(_ff_ver - 2) f"Chrome/{_ff_ver - 2}.0.0.0 Safari/537.36")
GLOBALS = { GLOBALS = {
"contains" : contains, "contains" : contains,
@@ -1057,7 +1050,6 @@ class RangePredicate():
_parse("1:2,4:8:2") -> [(1,1), (4,7,2)] _parse("1:2,4:8:2") -> [(1,1), (4,7,2)]
""" """
ranges = [] ranges = []
append = ranges.append
if isinstance(rangespec, str): if isinstance(rangespec, str):
rangespec = rangespec.split(",") rangespec = rangespec.split(",")
@@ -1069,7 +1061,7 @@ class RangePredicate():
elif ":" in group: elif ":" in group:
start, _, stop = group.partition(":") start, _, stop = group.partition(":")
stop, _, step = stop.partition(":") stop, _, step = stop.partition(":")
append(range( ranges.append(range(
int(start) if start.strip() else 1, int(start) if start.strip() else 1,
int(stop) if stop.strip() else sys.maxsize, int(stop) if stop.strip() else sys.maxsize,
int(step) if step.strip() else 1, int(step) if step.strip() else 1,
@@ -1077,14 +1069,14 @@ class RangePredicate():
elif "-" in group: elif "-" in group:
start, _, stop = group.partition("-") start, _, stop = group.partition("-")
append(range( ranges.append(range(
int(start) if start.strip() else 1, int(start) if start.strip() else 1,
int(stop) + 1 if stop.strip() else sys.maxsize, int(stop) + 1 if stop.strip() else sys.maxsize,
)) ))
else: else:
start = int(group) start = int(group)
append(range(start, start+1)) ranges.append(range(start, start+1))
return ranges return ranges
@@ -1107,7 +1099,7 @@ class FilterPredicate():
"""Predicate; True if evaluating the given expression returns True""" """Predicate; True if evaluating the given expression returns True"""
def __init__(self, expr, target="image"): def __init__(self, expr, target="image"):
name = "<{} filter>".format(target) name = f"<{target} filter>"
self.expr = compile_filter(expr, name) self.expr = compile_filter(expr, name)
def __call__(self, _, kwdict): def __call__(self, _, kwdict):