445 lines
15 KiB
Python
445 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Extractors for https://www.iwara.tv/"""
|
|
|
|
from .common import Extractor, Message, Dispatch
|
|
from .. import text, util
|
|
from ..cache import cache, memcache
|
|
import hashlib
|
|
|
|
BASE_PATTERN = r"(?:https?://)?(?:www\.)?iwara\.tv"
|
|
USER_PATTERN = BASE_PATTERN + r"/profile/([^/?#]+)"
|
|
|
|
|
|
class IwaraExtractor(Extractor):
|
|
"""Base class for iwara.tv extractors"""
|
|
category = "iwara"
|
|
root = "https://www.iwara.tv"
|
|
directory_fmt = ("{category}", "{user[name]}")
|
|
filename_fmt = "{date} {id} {title[:200]} {filename}.{extension}"
|
|
archive_fmt = "{type} {user[name]} {id} {file_id}"
|
|
|
|
def _init(self):
|
|
self.api = IwaraAPI(self)
|
|
|
|
def items_image(self, images, user=None):
|
|
for image in images:
|
|
try:
|
|
if "image" in image:
|
|
# could extract 'date_favorited' here
|
|
image = image["image"]
|
|
if not (files := image.get("files")):
|
|
image = self.api.image(image["id"])
|
|
files = image["files"]
|
|
|
|
group_info = self.extract_media_info(image, "file", False)
|
|
group_info["user"] = (self.extract_user_info(image)
|
|
if user is None else user)
|
|
except Exception as exc:
|
|
self.status |= 1
|
|
self.log.error("Failed to process image %s (%s: %s)",
|
|
image["id"], exc.__class__.__name__, exc)
|
|
continue
|
|
|
|
group_info["type"] = "image"
|
|
group_info["count"] = len(files)
|
|
yield Message.Directory, "", group_info
|
|
for num, file in enumerate(files, 1):
|
|
file_info = self.extract_media_info(file, None)
|
|
file_id = file_info["file_id"]
|
|
url = (f"https://i.iwara.tv/image/original/"
|
|
f"{file_id}/{file_id}.{file_info['extension']}")
|
|
yield Message.Url, url, {**file_info, **group_info, "num": num}
|
|
|
|
def items_video(self, videos, user=None):
|
|
for video in videos:
|
|
try:
|
|
if "video" in video:
|
|
video = video["video"]
|
|
if "fileUrl" not in video:
|
|
video = self.api.video(video["id"])
|
|
file_url = video["fileUrl"]
|
|
sources = self.api.source(file_url)
|
|
source = next((s for s in sources
|
|
if s.get("name") == "Source"), None)
|
|
download_url = source.get('src', {}).get('download')
|
|
|
|
info = self.extract_media_info(video, "file")
|
|
info["count"] = info["num"] = 1
|
|
info["user"] = (self.extract_user_info(video)
|
|
if user is None else user)
|
|
except Exception as exc:
|
|
self.status |= 1
|
|
self.log.error("Failed to process video %s (%s: %s)",
|
|
video["id"], exc.__class__.__name__, exc)
|
|
continue
|
|
|
|
yield Message.Directory, "", info
|
|
yield Message.Url, "https:" + download_url, info
|
|
|
|
def items_user(self, users, key=None):
|
|
base = self.root + "/profile/"
|
|
for user in users:
|
|
if key is not None:
|
|
user = user[key]
|
|
if (username := user["username"]) is None:
|
|
continue
|
|
user["type"] = "user"
|
|
user["_extractor"] = IwaraUserExtractor
|
|
yield Message.Queue, base + username, user
|
|
|
|
def items_by_type(self, type, results):
|
|
if type == "image":
|
|
return self.items_image(results)
|
|
if type == "video":
|
|
return self.items_video(results)
|
|
if type == "user":
|
|
return self.items_user(results)
|
|
|
|
raise self.exc.AbortExtraction(f"Unsupported result type '{type}'")
|
|
|
|
def extract_media_info(self, item, key, include_file_info=True):
|
|
info = {
|
|
"id" : item["id"],
|
|
"slug" : item.get("slug"),
|
|
"rating" : item.get("rating"),
|
|
"likes" : item.get("numLikes"),
|
|
"views" : item.get("numViews"),
|
|
"comments": item.get("numComments"),
|
|
"tags" : [t["id"] for t in item.get("tags") or ()],
|
|
"title" : t.strip() if (t := item.get("title")) else "",
|
|
"description": t.strip() if (t := item.get("body")) else "",
|
|
}
|
|
|
|
if include_file_info:
|
|
file_info = item if key is None else item.get(key) or {}
|
|
filename, _, extension = file_info.get("name", "").rpartition(".")
|
|
|
|
info["file_id"] = file_info.get("id")
|
|
info["filename"] = filename
|
|
info["extension"] = extension
|
|
info["date"] = self.parse_datetime_iso(
|
|
file_info.get("createdAt"))
|
|
info["date_updated"] = self.parse_datetime_iso(
|
|
file_info.get("updatedAt"))
|
|
info["mime"] = file_info.get("mime")
|
|
info["size"] = file_info.get("size")
|
|
info["width"] = file_info.get("width")
|
|
info["height"] = file_info.get("height")
|
|
info["duration"] = file_info.get("duration")
|
|
info["type"] = file_info.get("type")
|
|
|
|
return info
|
|
|
|
def extract_user_info(self, profile):
|
|
user = profile.get("user") or {}
|
|
return {
|
|
"id" : user.get("id"),
|
|
"name" : user.get("username"),
|
|
"nick" : user.get("name").strip(),
|
|
"status" : user.get("status"),
|
|
"role" : user.get("role"),
|
|
"premium": user.get("premium"),
|
|
"date" : self.parse_datetime_iso(user.get("createdAt")),
|
|
"description": profile.get("body"),
|
|
}
|
|
|
|
def _user_params(self):
|
|
user, qs = self.groups
|
|
params = text.parse_query(qs)
|
|
profile = self.api.profile(user)
|
|
params["user"] = profile["user"]["id"]
|
|
return self.extract_user_info(profile), params
|
|
|
|
|
|
class IwaraUserExtractor(Dispatch, IwaraExtractor):
|
|
"""Extractor for iwara.tv profile pages"""
|
|
pattern = USER_PATTERN + r"/?$"
|
|
example = "https://www.iwara.tv/profile/USERNAME"
|
|
|
|
def items(self):
|
|
base = f"{self.root}/profile/{self.groups[0]}/"
|
|
return self._dispatch_extractors((
|
|
(IwaraUserImagesExtractor , base + "images"),
|
|
(IwaraUserVideosExtractor , base + "videos"),
|
|
(IwaraUserPlaylistsExtractor, base + "playlists"),
|
|
), ("user-images", "user-videos"))
|
|
|
|
|
|
class IwaraUserImagesExtractor(IwaraExtractor):
|
|
subcategory = "user-images"
|
|
pattern = USER_PATTERN + r"/images(?:\?([^#]+))?"
|
|
example = "https://www.iwara.tv/profile/USERNAME/images"
|
|
|
|
def items(self):
|
|
user, params = self._user_params()
|
|
return self.items_image(self.api.images(params), user)
|
|
|
|
|
|
class IwaraUserVideosExtractor(IwaraExtractor):
|
|
subcategory = "user-videos"
|
|
pattern = USER_PATTERN + r"/videos(?:\?([^#]+))?"
|
|
example = "https://www.iwara.tv/profile/USERNAME/videos"
|
|
|
|
def items(self):
|
|
user, params = self._user_params()
|
|
return self.items_video(self.api.videos(params), user)
|
|
|
|
|
|
class IwaraUserPlaylistsExtractor(IwaraExtractor):
|
|
subcategory = "user-playlists"
|
|
pattern = USER_PATTERN + r"/playlists(?:\?([^#]+))?"
|
|
example = "https://www.iwara.tv/profile/USERNAME/playlists"
|
|
|
|
def items(self):
|
|
base = self.root + "/playlist/"
|
|
|
|
for playlist in self.api.playlists(self._user_params()[1]):
|
|
playlist["type"] = "playlist"
|
|
playlist["_extractor"] = IwaraPlaylistExtractor
|
|
url = base + playlist["id"]
|
|
yield Message.Queue, url, playlist
|
|
|
|
|
|
class IwaraFollowingExtractor(IwaraExtractor):
|
|
subcategory = "following"
|
|
pattern = USER_PATTERN + r"/following"
|
|
example = "https://www.iwara.tv/profile/USERNAME/following"
|
|
|
|
def items(self):
|
|
uid = self.api.profile(self.groups[0])["user"]["id"]
|
|
return self.items_user(self.api.user_following(uid), "user")
|
|
|
|
|
|
class IwaraFollowersExtractor(IwaraExtractor):
|
|
subcategory = "followers"
|
|
pattern = USER_PATTERN + r"/followers"
|
|
example = "https://www.iwara.tv/profile/USERNAME/followers"
|
|
|
|
def items(self):
|
|
uid = self.api.profile(self.groups[0])["user"]["id"]
|
|
return self.items_user(self.api.user_followers(uid), "follower")
|
|
|
|
|
|
class IwaraImageExtractor(IwaraExtractor):
|
|
"""Extractor for individual iwara.tv image pages"""
|
|
subcategory = "image"
|
|
pattern = BASE_PATTERN + r"/image/([^/?#]+)"
|
|
example = "https://www.iwara.tv/image/ID"
|
|
|
|
def items(self):
|
|
return self.items_image((self.api.image(self.groups[0]),))
|
|
|
|
|
|
class IwaraVideoExtractor(IwaraExtractor):
|
|
"""Extractor for individual iwara.tv videos"""
|
|
subcategory = "video"
|
|
pattern = BASE_PATTERN + r"/video/([^/?#]+)"
|
|
example = "https://www.iwara.tv/video/ID"
|
|
|
|
def items(self):
|
|
return self.items_video((self.api.video(self.groups[0]),))
|
|
|
|
|
|
class IwaraPlaylistExtractor(IwaraExtractor):
|
|
"""Extractor for individual iwara.tv playlist pages"""
|
|
subcategory = "playlist"
|
|
pattern = BASE_PATTERN + r"/playlist/([^/?#]+)"
|
|
example = "https://www.iwara.tv/playlist/ID"
|
|
|
|
def items(self):
|
|
return self.items_video(self.api.playlist(self.groups[0]))
|
|
|
|
|
|
class IwaraFavoriteExtractor(IwaraExtractor):
|
|
subcategory = "favorite"
|
|
pattern = BASE_PATTERN + r"/favorites(?:/(image|video)s)?"
|
|
example = "https://www.iwara.tv/favorites/videos"
|
|
|
|
def items(self):
|
|
type = self.groups[0] or "vidoo"
|
|
return self.items_by_type(type, self.api.favorites(type))
|
|
|
|
|
|
class IwaraSearchExtractor(IwaraExtractor):
|
|
"""Extractor for iwara.tv search pages"""
|
|
subcategory = "search"
|
|
pattern = BASE_PATTERN + r"/search\?([^#]+)"
|
|
example = "https://www.iwara.tv/search?query=QUERY&type=TYPE"
|
|
|
|
def items(self):
|
|
params = text.parse_query(self.groups[0])
|
|
type = params.get("type")
|
|
self.kwdict["search_tags"] = query = params.get("query")
|
|
return self.items_by_type(type, self.api.search(type, query))
|
|
|
|
|
|
class IwaraTagExtractor(IwaraExtractor):
|
|
"""Extractor for iwara.tv tag search"""
|
|
subcategory = "tag"
|
|
pattern = BASE_PATTERN + r"/(image|video)s(?:\?([^#]+))?"
|
|
example = "https://www.iwara.tv/videos?tags=TAGS"
|
|
|
|
def items(self):
|
|
type, qs = self.groups
|
|
params = text.parse_query(qs)
|
|
self.kwdict["search_tags"] = params.get("tags")
|
|
return self.items_by_type(type, self.api.media(type, params))
|
|
|
|
|
|
class IwaraAPI():
|
|
"""Interface for the Iwara API"""
|
|
root = "https://api.iwara.tv"
|
|
|
|
def __init__(self, extractor):
|
|
self.extractor = extractor
|
|
self.headers = {
|
|
"Referer" : extractor.root + "/",
|
|
"Content-Type": "application/json",
|
|
"Origin" : extractor.root,
|
|
}
|
|
|
|
self.username, self.password = extractor._get_auth_info()
|
|
if not self.username:
|
|
self.authenticate = util.noop
|
|
|
|
def image(self, image_id):
|
|
endpoint = "/image/" + image_id
|
|
return self._call(endpoint)
|
|
|
|
def video(self, video_id):
|
|
endpoint = "/video/" + video_id
|
|
return self._call(endpoint)
|
|
|
|
def playlist(self, playlist_id):
|
|
endpoint = "/playlist/" + playlist_id
|
|
return self._pagination(endpoint)
|
|
|
|
def detail(self, media):
|
|
endpoint = f"/{media['type']}/{media['id']}"
|
|
return self._call(endpoint)
|
|
|
|
def images(self, params):
|
|
endpoint = "/images"
|
|
params.setdefault("rating", "all")
|
|
return self._pagination(endpoint, params)
|
|
|
|
def videos(self, params):
|
|
endpoint = "/videos"
|
|
params.setdefault("rating", "all")
|
|
return self._pagination(endpoint, params)
|
|
|
|
def playlists(self, params):
|
|
endpoint = "/playlists"
|
|
return self._pagination(endpoint, params)
|
|
|
|
def media(self, type, params):
|
|
endpoint = f"/{type}s"
|
|
params.setdefault("rating", "all")
|
|
return self._pagination(endpoint, params)
|
|
|
|
def favorites(self, type):
|
|
if not self.username:
|
|
raise self.exc.AuthRequired(
|
|
"username & password", "your favorites")
|
|
endpoint = f"/favorites/{type}s"
|
|
return self._pagination(endpoint)
|
|
|
|
def search(self, type, query):
|
|
endpoint = "/search"
|
|
params = {"type": type, "query": query}
|
|
return self._pagination(endpoint, params)
|
|
|
|
@memcache(keyarg=1)
|
|
def profile(self, username):
|
|
endpoint = "/profile/" + username
|
|
return self._call(endpoint)
|
|
|
|
def user_following(self, user_id):
|
|
endpoint = f"/user/{user_id}/following"
|
|
return self._pagination(endpoint)
|
|
|
|
def user_followers(self, user_id):
|
|
endpoint = f"/user/{user_id}/followers"
|
|
return self._pagination(endpoint)
|
|
|
|
def source(self, file_url):
|
|
base, _, query = file_url.partition("?")
|
|
if not (expires := text.extr(query, "expires=", "&")):
|
|
return ()
|
|
file_id = base.rpartition("/")[2]
|
|
sha_postfix = "5nFp9kmbNnHdAFhaqMvt"
|
|
sha_key = f"{file_id}_{expires}_{sha_postfix}"
|
|
hash = hashlib.sha1(sha_key.encode()).hexdigest()
|
|
headers = {"X-Version": hash, **self.headers}
|
|
return self.extractor.request_json(file_url, headers=headers)
|
|
|
|
def authenticate(self):
|
|
self.headers["Authorization"] = self._authenticate_impl(self.username)
|
|
|
|
@cache(maxage=3600, keyarg=1)
|
|
def _authenticate_impl(self, username):
|
|
refresh_token = _refresh_token_cache(username)
|
|
if refresh_token is None:
|
|
self.extractor.log.info("Logging in as %s", username)
|
|
|
|
url = self.root + "/user/login"
|
|
json = {
|
|
"email" : username,
|
|
"password": self.password
|
|
}
|
|
data = self.extractor.request_json(
|
|
url, method="POST", headers=self.headers, json=json,
|
|
fatal=False)
|
|
|
|
if not (refresh_token := data.get("token")):
|
|
self.extractor.log.debug(data)
|
|
raise self.exc.AuthenticationError(data.get("message"))
|
|
_refresh_token_cache.update(username, refresh_token)
|
|
|
|
self.extractor.log.info("Refreshing access token for %s", username)
|
|
|
|
url = self.root + "/user/token"
|
|
headers = {"Authorization": "Bearer " + refresh_token, **self.headers}
|
|
data = self.extractor.request_json(
|
|
url, method="POST", headers=headers, fatal=False)
|
|
|
|
if not (access_token := data.get("accessToken")):
|
|
self.extractor.log.debug(data)
|
|
raise self.exc.AuthenticationError(data.get("message"))
|
|
return "Bearer " + access_token
|
|
|
|
def _call(self, endpoint, params=None, headers=None):
|
|
if headers is None:
|
|
headers = self.headers
|
|
|
|
url = self.root + endpoint
|
|
self.authenticate()
|
|
return self.extractor.request_json(url, params=params, headers=headers)
|
|
|
|
def _pagination(self, endpoint, params=None):
|
|
if params is None:
|
|
params = {}
|
|
params["page"] = 0
|
|
params["limit"] = 50
|
|
|
|
while True:
|
|
data = self._call(endpoint, params)
|
|
|
|
if not (results := data.get("results")):
|
|
break
|
|
yield from results
|
|
|
|
if len(results) < params["limit"]:
|
|
break
|
|
params["page"] += 1
|
|
|
|
|
|
@cache(maxage=28*86400, keyarg=0)
|
|
def _refresh_token_cache(username):
|
|
return None
|