177 lines
5.3 KiB
Python
177 lines
5.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright 2024-2026 Mike Fährmann
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Extractors for https://rule34.xyz/"""
|
|
|
|
from .booru import BooruExtractor
|
|
from .. import text
|
|
from ..cache import cache
|
|
import collections
|
|
|
|
BASE_PATTERN = r"(?:https?://)?(?:www\.)?rule34\.xyz"
|
|
|
|
|
|
class Rule34xyzExtractor(BooruExtractor):
|
|
category = "rule34xyz"
|
|
root = "https://rule34.xyz"
|
|
root_cdn = "https://rule34xyz.b-cdn.net"
|
|
filename_fmt = "{category}_{id}.{extension}"
|
|
per_page = 60
|
|
|
|
TAG_TYPES = {
|
|
None: "general",
|
|
0 : "general",
|
|
1 : "general",
|
|
2 : "copyright",
|
|
4 : "character",
|
|
8 : "artist",
|
|
16 : "system",
|
|
32 : "meta",
|
|
}
|
|
FORMATS = {
|
|
"10" : "pic.jpg",
|
|
"100": "mov.mp4",
|
|
"101": "mov720.mp4",
|
|
"102": "mov480.mp4",
|
|
}
|
|
|
|
def _init(self):
|
|
if formats := self.config("format"):
|
|
if isinstance(formats, str):
|
|
formats = formats.split(",")
|
|
self.formats = formats
|
|
else:
|
|
self.formats = ("100", "101", "102", "10")
|
|
|
|
def _file_url(self, post):
|
|
files = post["files"]
|
|
|
|
for fmt in self.formats:
|
|
if fmt in files:
|
|
extension = self.FORMATS.get(fmt)
|
|
break
|
|
else:
|
|
self.log.warning("%s: Requested format not available", post["id"])
|
|
fmt = next(iter(files))
|
|
|
|
post_id = post["id"]
|
|
root = self.root_cdn if files[fmt][0] else self.root
|
|
post["file_url"] = url = \
|
|
f"{root}/posts/{post_id // 1000}/{post_id}/{post_id}.{extension}"
|
|
post["format_id"] = fmt
|
|
post["format"] = extension.partition(".")[0]
|
|
|
|
return url
|
|
|
|
def _prepare(self, post):
|
|
post.pop("files", None)
|
|
post["date"] = self.parse_datetime_iso(post["created"])
|
|
post["filename"], _, post["format"] = post["filename"].rpartition(".")
|
|
if "tags" in post:
|
|
post["tags"] = [t["value"] for t in post["tags"]]
|
|
|
|
def _tags(self, post, _):
|
|
if "tags" not in post:
|
|
post.update(self._fetch_post(post["id"]))
|
|
|
|
tags = collections.defaultdict(list)
|
|
for tag in post["tags"]:
|
|
tags[tag["type"]].append(tag["value"])
|
|
types = self.TAG_TYPES
|
|
for type, values in tags.items():
|
|
post["tags_" + types[type]] = values
|
|
|
|
def _fetch_post(self, post_id):
|
|
url = f"{self.root}/api/v2/post/{post_id}"
|
|
return self.request_json(url)
|
|
|
|
def _pagination(self, endpoint, params=None):
|
|
url = f"{self.root}/api{endpoint}"
|
|
|
|
if params is None:
|
|
params = {}
|
|
params["Skip"] = self.page_start * self.per_page
|
|
params["take"] = self.per_page
|
|
params["CountTotal"] = False
|
|
params["IncludeLinks"] = True
|
|
params["OrderBy"] = 0
|
|
threshold = self.per_page
|
|
|
|
while True:
|
|
data = self.request_json(url, method="POST", json=params)
|
|
|
|
yield from data["items"]
|
|
|
|
if len(data["items"]) < threshold:
|
|
return
|
|
params["Skip"] += self.per_page
|
|
params["cursor"] = data["cursor"]
|
|
|
|
def login(self):
|
|
username, password = self._get_auth_info()
|
|
if username:
|
|
self.session.headers["Authorization"] = \
|
|
self._login_impl(username, password)
|
|
|
|
@cache(maxage=3650*86400, keyarg=1)
|
|
def _login_impl(self, username, password):
|
|
self.log.info("Logging in as %s", username)
|
|
|
|
url = self.root + "/api/v2/auth/signin"
|
|
data = {"email": username, "password": password}
|
|
response = self.request_json(
|
|
url, method="POST", json=data, fatal=False)
|
|
|
|
if jwt := response.get("jwt"):
|
|
return "Bearer " + jwt
|
|
raise self.exc.AuthenticationError(
|
|
(msg := response.get("message")) and f'"{msg}"')
|
|
|
|
|
|
class Rule34xyzPostExtractor(Rule34xyzExtractor):
|
|
subcategory = "post"
|
|
archive_fmt = "{id}"
|
|
pattern = BASE_PATTERN + r"/post/(\d+)"
|
|
example = "https://rule34.xyz/post/12345"
|
|
|
|
def posts(self):
|
|
return (self._fetch_post(self.groups[0]),)
|
|
|
|
|
|
class Rule34xyzPlaylistExtractor(Rule34xyzExtractor):
|
|
subcategory = "playlist"
|
|
directory_fmt = ("{category}", "{playlist_id}")
|
|
archive_fmt = "p_{playlist_id}_{id}"
|
|
pattern = BASE_PATTERN + r"/playlists/view/(\d+)"
|
|
example = "https://rule34.xyz/playlists/view/12345"
|
|
|
|
def metadata(self):
|
|
return {"playlist_id": self.groups[0]}
|
|
|
|
def posts(self):
|
|
endpoint = "/v2/post/search/playlist/" + self.groups[0]
|
|
return self._pagination(endpoint)
|
|
|
|
|
|
class Rule34xyzTagExtractor(Rule34xyzExtractor):
|
|
subcategory = "tag"
|
|
directory_fmt = ("{category}", "{search_tags}")
|
|
archive_fmt = "t_{search_tags}_{id}"
|
|
pattern = BASE_PATTERN + r"/([^/?#]+)$"
|
|
example = "https://rule34.xyz/TAG"
|
|
|
|
def metadata(self):
|
|
self.tags = text.unquote(text.unquote(
|
|
self.groups[0]).replace("_", " ")).split("|")
|
|
return {"search_tags": ", ".join(self.tags)}
|
|
|
|
def posts(self):
|
|
endpoint = "/v2/post/search/root"
|
|
params = {"includeTags": self.tags}
|
|
return self._pagination(endpoint, params)
|