[lofter]: add initial support

This commit is contained in:
hdk5
2024-12-05 17:38:57 +02:00
parent d9bbe3b3b3
commit 0466fcab4c
2 changed files with 151 additions and 0 deletions

View File

@@ -98,6 +98,7 @@ modules = [
"lexica",
"lightroom",
"livedoor",
"lofter",
"luscious",
"lynxchan",
"mangadex",

View File

@@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://www.lofter.com/"""
from .common import Extractor, Message
from .. import text, util, exception
class LofterExtractor(Extractor):
"""Base class for lofter extractors"""
category = "lofter"
root = "https://www.lofter.com"
directory_fmt = ("{category}", "{blog_name}")
filename_fmt = "{id}_{num}.{extension}"
archive_fmt = "{id}_{num}"
def _init(self):
self.api = LofterAPI(self)
def items(self):
for post in self.posts():
if "post" in post:
post = post["post"]
post["blog_name"] = post["blogInfo"]["blogName"]
post_type = post["type"]
image_urls = []
# Article
if post_type == 1:
content = post["content"]
image_urls = text.extract_iter(content, '<img src="', '"')
image_urls = [text.unescape(x) for x in image_urls]
image_urls = [x.partition("?")[0] for x in image_urls]
# Photo
elif post_type == 2:
photo_links = util.json_loads(post["photoLinks"])
image_urls = [x["orign"] for x in photo_links]
image_urls = [x.partition("?")[0] for x in image_urls]
# Video
elif post_type == 4:
embed = util.json_loads(post["embed"])
image_urls = [embed["originUrl"]]
# Answer
elif post_type == 5:
images = util.json_loads(post["images"])
image_urls = [x["orign"] for x in images]
image_urls = [x.partition("?")[0] for x in image_urls]
else:
self.log.warning(
"%s: Unsupported post type '%s'.",
post["id"], post_type)
post["count"] = len(image_urls)
yield Message.Directory, post
for post["num"], url in enumerate(image_urls, 1):
yield Message.Url, url, text.nameext_from_url(url, post)
def posts(self):
return ()
class LofterPostExtractor(LofterExtractor):
"""Extractor for a lofter post"""
subcategory = "post"
pattern = r"(?:https?://)?[\w-]+\.lofter\.com/post/([0-9a-f]+)_([0-9a-f]+)"
example = "https://blog_name.lofter.com/post/12345678_90abcdef"
def posts(self):
blog_id, post_id = self.groups
post = self.api.post(int(blog_id, 16), int(post_id, 16))
return (post,)
class LofterBlogPostsExtractor(LofterExtractor):
"""Extractor for a lofter blog's posts"""
subcategory = "blog-posts"
pattern = (r"(?:https?://)?(?:"
# https://www.lofter.com/front/blog/home-page/<blog_name>
r"www\.lofter\.com/front/blog/home-page/([\w-]+)|"
# https://<blog_name>.lofter.com/
r"([\w-]+)\.lofter\.com"
r")")
example = "https://blog_name.lofter.com/"
def posts(self):
blog_name = self.groups[0] or self.groups[1]
posts = self.api.blog_posts(blog_name)
return posts
class LofterAPI():
def __init__(self, extractor):
self.extractor = extractor
def _call(self, endpoint, data):
url = "https://api.lofter.com{}".format(endpoint)
params = {
'product': 'lofter-android-7.9.10'
}
response = self.extractor.request(
url, method="POST", params=params, data=data)
info = response.json()
if info["meta"]["status"] != 200:
self.extractor.log.debug("Server response: %s", info)
raise exception.StopExtraction("API request failed")
return info
def blog_posts(self, blog_name):
endpoint = "/v2.0/blogHomePage.api"
params = {
"method": "getPostLists",
"offset": 0,
"limit": 200,
"blogdomain": "{}.lofter.com".format(blog_name),
}
while True:
data = self._call(endpoint, params)
posts = data["response"]["posts"]
for post in posts:
yield post
if params["offset"] + len(posts) < data["response"]["offset"]:
break
params["offset"] = data["response"]["offset"]
def post(self, blog_id, post_id):
endpoint = "/oldapi/post/detail.api"
params = {
"targetblogid": blog_id,
"postid": post_id,
}
data = self._call(endpoint, params)
posts = data["response"]["posts"]
post = posts[0]
return post