272 lines
7.3 KiB
Python
Executable File
272 lines
7.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright 2025-2026 Mike Fährmann
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Generate test result data"""
|
|
|
|
import logging
|
|
import argparse
|
|
import json
|
|
import util
|
|
from pyprint import pyprint
|
|
from gallery_dl import extractor, job, config, exception
|
|
|
|
LOG = logging.getLogger("gen-test")
|
|
|
|
|
|
class LoggingCapture(logging.Handler):
|
|
|
|
def __init__(self, args):
|
|
logging.Handler.__init__(self)
|
|
|
|
if args.logging:
|
|
self.records = []
|
|
self.output = []
|
|
self.level = logging.INFO
|
|
else:
|
|
self.records = self.output = None
|
|
|
|
def __enter__(self):
|
|
if self.records is None:
|
|
return
|
|
|
|
logger = logging.getLogger(None)
|
|
logger.handlers.append(self)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
pass
|
|
|
|
def flush(self):
|
|
pass
|
|
|
|
def emit(self, record):
|
|
self.records.append(record)
|
|
self.output.append(self.format(record))
|
|
|
|
|
|
def generate_test_result(args):
|
|
head = generate_head(args)
|
|
|
|
if args.only_matching:
|
|
opts = meta = None
|
|
else:
|
|
if args.auth:
|
|
cfg = util.path("archive", "config.json")
|
|
config.load((cfg,), strict=True)
|
|
if args.options:
|
|
args.options_parsed = options = {}
|
|
for opt in args.options:
|
|
key, _, value = opt.partition("=")
|
|
try:
|
|
value = json.loads(value)
|
|
except ValueError:
|
|
pass
|
|
options[key] = value
|
|
config.set((), key, value)
|
|
if args.range:
|
|
config.set((), "image-range" , args.range)
|
|
config.set((), "chapter-range", args.range)
|
|
|
|
djob = job.DataJob(args.extr, file=None)
|
|
djob.filter = dict.copy
|
|
with LoggingCapture(args) as log_info:
|
|
djob.run()
|
|
|
|
opts = generate_opts(
|
|
args, djob.data_urls, djob.data_meta, djob.exception, log_info)
|
|
ool = (len(opts) > 1 or "#options" in opts)
|
|
|
|
if args.metadata:
|
|
meta = generate_meta(args, djob.data_meta)
|
|
else:
|
|
meta = None
|
|
|
|
result = pyprint(head, oneline=False, lmin=9)
|
|
if opts:
|
|
result = result[:-2] + pyprint(opts, oneline=ool, lmin=9)[1:]
|
|
if meta:
|
|
result = result[:-1] + pyprint(meta, sort=sort_key)[1:]
|
|
return result + ",\n\n"
|
|
|
|
|
|
def generate_head(args):
|
|
head = {}
|
|
cls = args.cls
|
|
|
|
head["#url"] = args.extr.url
|
|
if args.comment is not None:
|
|
head["#comment"] = args.comment
|
|
if args.base or args.cat != cls.category or args.sub != cls.subcategory:
|
|
head["#category"] = (args.base, args.cat, args.sub)
|
|
head["#class"] = args.cls
|
|
|
|
return head
|
|
|
|
|
|
def generate_opts(args, urls, meta=(), exc=None, log=None):
|
|
opts = {}
|
|
|
|
if args.auth is not None:
|
|
opts["#auth"] = args.auth
|
|
|
|
if args.options:
|
|
opts["#options"] = args.options_parsed
|
|
|
|
if args.range:
|
|
opts["#range"] = args.range
|
|
|
|
if exc:
|
|
if isinstance(exc, exception.GalleryDLException):
|
|
opts["#exception"] = exc.__class__.__name__
|
|
else:
|
|
opts["#exception"] = exc.__class__
|
|
elif not urls:
|
|
opts["#count"] = 0
|
|
elif len(urls) == 1:
|
|
opts["#results"] = urls[0]
|
|
elif len(urls) < args.limit_urls:
|
|
opts["#results"] = tuple(urls)
|
|
else:
|
|
if meta and (extr := meta[0].get("_extractor")):
|
|
name = extr.__module__.rpartition(".")[2]
|
|
if name[0].isdecimal():
|
|
name = f"_{name}"
|
|
opts["#pattern"] = f"lit:{name}.{extr.__name__}.pattern"
|
|
else:
|
|
import re
|
|
opts["#pattern"] = re.escape(urls[0])
|
|
if "#range" in opts:
|
|
opts["#range"] = opts.pop("#range")
|
|
opts["#count"] = len(urls)
|
|
|
|
if log is not None:
|
|
if not log.records:
|
|
opts["#log"] = ()
|
|
elif len(log.records) == 1:
|
|
opts["#log"] = log.output[0]
|
|
else:
|
|
opts["#log"] = log.output
|
|
|
|
return opts
|
|
|
|
|
|
def generate_meta(args, data):
|
|
if not data:
|
|
return {}
|
|
|
|
for kwdict in data:
|
|
delete = ["category", "subcategory"]
|
|
for key in kwdict:
|
|
if not key or key[0] == "_":
|
|
delete.append(key)
|
|
for key in delete:
|
|
del kwdict[key]
|
|
|
|
return data[0]
|
|
|
|
|
|
def sort_key(key, value):
|
|
if not value:
|
|
return 0
|
|
if isinstance(value, str) and "\n" in value:
|
|
return 7000
|
|
if isinstance(value, list) and not small(value):
|
|
return 8000
|
|
if isinstance(value, dict) and not small(value):
|
|
return 9000
|
|
return 0
|
|
|
|
|
|
def small(obj):
|
|
if not obj:
|
|
return True
|
|
if isinstance(obj, list):
|
|
return False if len(obj) > 1 else small(obj[0])
|
|
if isinstance(obj, dict):
|
|
return False if len(obj) > 1 else small(next(iter(obj.values())))
|
|
return True
|
|
|
|
|
|
def insert_test_result(args, result, lines):
|
|
idx_block = None
|
|
flag = False
|
|
|
|
for idx, line in enumerate(lines):
|
|
line = line.lstrip()
|
|
if not line:
|
|
continue
|
|
elif line[0] == "{":
|
|
idx_block = idx
|
|
elif line.startswith('"#class"'):
|
|
if args.cls.__name__ in line:
|
|
flag = True
|
|
elif flag:
|
|
flag = None
|
|
break
|
|
|
|
if idx_block is None or flag is not None:
|
|
lines.insert(-1, result)
|
|
else:
|
|
lines.insert(idx_block, result)
|
|
|
|
|
|
def parse_args(args=None):
|
|
parser = argparse.ArgumentParser(args)
|
|
parser.add_argument("-a", "--auth", action="store_true", default=None)
|
|
parser.add_argument("-A", "--no-auth", action="store_false", dest="auth")
|
|
parser.add_argument("-c", "--comment", default=None)
|
|
parser.add_argument("-C", dest="comment", action="store_const", const="")
|
|
parser.add_argument("-g", "--git", action="store_true")
|
|
parser.add_argument("-l", "--logging", action="store_true")
|
|
parser.add_argument("-L", "--limit_urls", type=int, default=10)
|
|
parser.add_argument("-m", "--metadata", action="store_true")
|
|
parser.add_argument("-o", "--option", dest="options", action="append")
|
|
parser.add_argument("-O", "--only-matching", action="store_true")
|
|
parser.add_argument("-r", "--range")
|
|
parser.add_argument("URL")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
args.url = args.URL
|
|
|
|
extr = extractor.find(args.url)
|
|
if extr is None:
|
|
LOG.error("Unsupported URL '%s'", args.url)
|
|
raise SystemExit(1)
|
|
|
|
args.extr = extr
|
|
args.cls = extr.__class__
|
|
args.cat = extr.category
|
|
args.sub = extr.subcategory
|
|
args.base = extr.basecategory
|
|
|
|
LOG.info("Collecting data for '%s'", args.url)
|
|
result = generate_test_result(args)
|
|
|
|
path = util.path("test", "results", f"{args.cat}.py")
|
|
path_tr = util.trim(path)
|
|
LOG.info("Writing '%s' results to '%s'", args.url, path_tr)
|
|
with util.lines(path) as lines:
|
|
insert_test_result(args, result, lines)
|
|
|
|
if args.git:
|
|
LOG.info("git add %s", path_tr)
|
|
util.git("add", "--", path_tr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="[%(levelname)s] %(message)s",
|
|
)
|
|
main()
|