207 lines
6.6 KiB
Python
207 lines
6.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright 2019-2025 Mike Fährmann
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Extractors for Blogger blogs"""
|
|
|
|
from .common import BaseExtractor, Message
|
|
from .. import text, util
|
|
|
|
|
|
def original(url):
|
|
return (text.re(r"(/|=)(?:[sw]\d+|w\d+-h\d+)(?=/|$)")
|
|
.sub(r"\1s0", url)
|
|
.replace("http:", "https:", 1))
|
|
|
|
|
|
class BloggerExtractor(BaseExtractor):
|
|
"""Base class for blogger extractors"""
|
|
basecategory = "blogger"
|
|
directory_fmt = ("blogger", "{blog[name]}",
|
|
"{post[date]:%Y-%m-%d} {post[title]}")
|
|
filename_fmt = "{num:>03}.{extension}"
|
|
archive_fmt = "{post[id]}_{num}"
|
|
|
|
def _init(self):
|
|
self.api = BloggerAPI(self)
|
|
self.blog = self.root.rpartition("/")[2]
|
|
self.videos = self.config("videos", True)
|
|
|
|
if self.videos:
|
|
self.findall_video = text.re(
|
|
r"""src=["'](https?://www\.blogger\.com"""
|
|
r"""/video\.g\?token=[^"']+)""").findall
|
|
|
|
def items(self):
|
|
blog = self.api.blog_by_url("http://" + self.blog)
|
|
blog["pages"] = blog["pages"]["totalItems"]
|
|
blog["posts"] = blog["posts"]["totalItems"]
|
|
blog["date"] = self.parse_datetime_iso(blog["published"])
|
|
del blog["selfLink"]
|
|
|
|
findall_image = text.re(
|
|
r'src="(https?://(?:'
|
|
r'blogger\.googleusercontent\.com/img|'
|
|
r'lh\d+(?:-\w+)?\.googleusercontent\.com|'
|
|
r'\d+\.bp\.blogspot\.com)/[^"]+)').findall
|
|
metadata = self.metadata()
|
|
|
|
for post in self.posts(blog):
|
|
content = post["content"]
|
|
|
|
files = findall_image(content)
|
|
for idx, url in enumerate(files):
|
|
files[idx] = original(url)
|
|
|
|
if self.videos and (
|
|
'id="BLOG_video-' in content or
|
|
'class="BLOG_video_' in content):
|
|
self._extract_videos(files, post)
|
|
|
|
post["author"] = post["author"]["displayName"]
|
|
post["replies"] = post["replies"]["totalItems"]
|
|
post["content"] = text.remove_html(content)
|
|
post["date"] = self.parse_datetime_iso(post["published"])
|
|
del post["selfLink"]
|
|
del post["blog"]
|
|
|
|
data = {"blog": blog, "post": post}
|
|
if metadata:
|
|
data.update(metadata)
|
|
yield Message.Directory, "", data
|
|
|
|
for data["num"], url in enumerate(files, 1):
|
|
data["url"] = url
|
|
yield Message.Url, url, text.nameext_from_url(url, data)
|
|
|
|
def posts(self, blog):
|
|
"""Return an iterable with all relevant post objects"""
|
|
|
|
def metadata(self):
|
|
"""Return additional metadata"""
|
|
|
|
def _extract_videos(self, files, post):
|
|
url = f"https://{self.blog}/feeds/posts/default/{post['id']}"
|
|
params = {
|
|
"alt" : "json",
|
|
"v" : "2",
|
|
"dynamicviews" : "1",
|
|
"rewriteforssl": "true",
|
|
}
|
|
|
|
data = self.request_json(url, params=params)
|
|
html = data["entry"]["content"]["$t"]
|
|
|
|
for url in self.findall_video(html):
|
|
page = self.request(url).text
|
|
video_config = util.json_loads(text.extr(
|
|
page, 'var VIDEO_CONFIG =', '\n'))
|
|
files.append(max(
|
|
video_config["streams"],
|
|
key=lambda x: x["format_id"],
|
|
)["play_url"])
|
|
|
|
|
|
BASE_PATTERN = BloggerExtractor.update({
|
|
"blogspot": {
|
|
"root": None,
|
|
"pattern": r"[\w-]+\.blogspot\.com",
|
|
},
|
|
})
|
|
|
|
|
|
class BloggerPostExtractor(BloggerExtractor):
|
|
"""Extractor for a single blog post"""
|
|
subcategory = "post"
|
|
pattern = BASE_PATTERN + r"(/\d\d\d\d/\d\d/[^/?#]+\.html)"
|
|
example = "https://BLOG.blogspot.com/1970/01/TITLE.html"
|
|
|
|
def posts(self, blog):
|
|
return (self.api.post_by_path(blog["id"], self.groups[-1]),)
|
|
|
|
|
|
class BloggerBlogExtractor(BloggerExtractor):
|
|
"""Extractor for an entire Blogger blog"""
|
|
subcategory = "blog"
|
|
pattern = BASE_PATTERN + r"/?$"
|
|
example = "https://BLOG.blogspot.com/"
|
|
|
|
def posts(self, blog):
|
|
return self.api.blog_posts(blog["id"])
|
|
|
|
|
|
class BloggerSearchExtractor(BloggerExtractor):
|
|
"""Extractor for Blogger search resuls"""
|
|
subcategory = "search"
|
|
pattern = BASE_PATTERN + r"/search/?\?q=([^&#]+)"
|
|
example = "https://BLOG.blogspot.com/search?q=QUERY"
|
|
|
|
def metadata(self):
|
|
self.query = query = text.unquote(self.groups[-1])
|
|
return {"query": query}
|
|
|
|
def posts(self, blog):
|
|
return self.api.blog_search(blog["id"], self.query)
|
|
|
|
|
|
class BloggerLabelExtractor(BloggerExtractor):
|
|
"""Extractor for Blogger posts by label"""
|
|
subcategory = "label"
|
|
pattern = BASE_PATTERN + r"/search/label/([^/?#]+)"
|
|
example = "https://BLOG.blogspot.com/search/label/LABEL"
|
|
|
|
def metadata(self):
|
|
self.label = label = text.unquote(self.groups[-1])
|
|
return {"label": label}
|
|
|
|
def posts(self, blog):
|
|
return self.api.blog_posts(blog["id"], self.label)
|
|
|
|
|
|
class BloggerAPI():
|
|
"""Minimal interface for the Blogger API v3
|
|
|
|
https://developers.google.com/blogger
|
|
"""
|
|
API_KEY = "AIzaSyCN9ax34oMMyM07g_M-5pjeDp_312eITK8"
|
|
|
|
def __init__(self, extractor):
|
|
self.extractor = extractor
|
|
self.api_key = extractor.config("api-key") or self.API_KEY
|
|
|
|
def blog_by_url(self, url):
|
|
return self._call("/blogs/byurl", {"url": url}, "blog")
|
|
|
|
def blog_posts(self, blog_id, label=None):
|
|
endpoint = f"/blogs/{blog_id}/posts"
|
|
params = {"labels": label}
|
|
return self._pagination(endpoint, params)
|
|
|
|
def blog_search(self, blog_id, query):
|
|
endpoint = f"/blogs/{blog_id}/posts/search"
|
|
params = {"q": query}
|
|
return self._pagination(endpoint, params)
|
|
|
|
def post_by_path(self, blog_id, path):
|
|
endpoint = f"/blogs/{blog_id}/posts/bypath"
|
|
return self._call(endpoint, {"path": path}, "post")
|
|
|
|
def _call(self, endpoint, params, notfound=None):
|
|
url = "https://www.googleapis.com/blogger/v3" + endpoint
|
|
params["key"] = self.api_key
|
|
return self.extractor.request_json(
|
|
url, params=params, notfound=notfound)
|
|
|
|
def _pagination(self, endpoint, params):
|
|
while True:
|
|
data = self._call(endpoint, params)
|
|
if "items" in data:
|
|
yield from data["items"]
|
|
if "nextPageToken" not in data:
|
|
return
|
|
params["pageToken"] = data["nextPageToken"]
|