287 lines
9.4 KiB
Python
287 lines
9.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Extractors for Misskey instances"""
|
|
|
|
from .common import BaseExtractor, Message, Dispatch
|
|
from .. import text, dt
|
|
from ..cache import memcache
|
|
|
|
|
|
class MisskeyExtractor(BaseExtractor):
|
|
"""Base class for Misskey extractors"""
|
|
basecategory = "misskey"
|
|
directory_fmt = ("misskey", "{instance}", "{user[username]}")
|
|
filename_fmt = "{category}_{id}_{file[id]}.{extension}"
|
|
archive_fmt = "{id}_{file[id]}"
|
|
|
|
def _init(self):
|
|
self.api = MisskeyAPI(self)
|
|
self.instance = self.root.rpartition("://")[2]
|
|
self.renotes = True if self.config("renotes", False) else False
|
|
self.replies = True if self.config("replies", True) else False
|
|
|
|
def items(self):
|
|
for note in self.notes():
|
|
if "note" in note:
|
|
note = note["note"]
|
|
files = note.pop("files") or []
|
|
if renote := note.get("renote"):
|
|
if not self.renotes:
|
|
self.log.debug("Skipping %s (renote)", note["id"])
|
|
continue
|
|
files.extend(renote.get("files") or ())
|
|
|
|
if reply := note.get("reply"):
|
|
if not self.replies:
|
|
self.log.debug("Skipping %s (reply)", note["id"])
|
|
continue
|
|
files.extend(reply.get("files") or ())
|
|
|
|
note["instance"] = self.instance
|
|
note["instance_remote"] = note["user"]["host"]
|
|
note["count"] = len(files)
|
|
note["date"] = self.parse_datetime_iso(note["createdAt"])
|
|
|
|
yield Message.Directory, "", note
|
|
for note["num"], file in enumerate(files, 1):
|
|
file["date"] = self.parse_datetime_iso(file["createdAt"])
|
|
note["file"] = file
|
|
url = file["url"]
|
|
yield Message.Url, url, text.nameext_from_url(url, note)
|
|
|
|
def notes(self):
|
|
"""Return an iterable containing all relevant Note objects"""
|
|
return ()
|
|
|
|
def _make_note(self, type, user, url):
|
|
# extract real URL from potential proxy
|
|
path, sep, query = url.partition("?")
|
|
if sep:
|
|
url = text.parse_query(query).get("url") or path
|
|
|
|
return {
|
|
"id" : type,
|
|
"user" : user,
|
|
"files": ({
|
|
"id" : url.rpartition("/")[2].partition(".")[0], # ID from URL
|
|
"url": url,
|
|
"createdAt": "",
|
|
},),
|
|
"createdAt": "",
|
|
}
|
|
|
|
|
|
BASE_PATTERN = MisskeyExtractor.update({
|
|
"misskey.io": {
|
|
"root": "https://misskey.io",
|
|
"pattern": r"misskey\.io",
|
|
},
|
|
"misskey.design": {
|
|
"root": "https://misskey.design",
|
|
"pattern": r"misskey\.design",
|
|
},
|
|
"misskey.art": {
|
|
"root": "https://misskey.art",
|
|
"pattern": r"misskey\.art",
|
|
},
|
|
"lesbian.energy": {
|
|
"root": "https://lesbian.energy",
|
|
"pattern": r"lesbian\.energy",
|
|
},
|
|
"sushi.ski": {
|
|
"root": "https://sushi.ski",
|
|
"pattern": r"sushi\.ski",
|
|
},
|
|
})
|
|
|
|
|
|
class MisskeyUserExtractor(Dispatch, MisskeyExtractor):
|
|
"""Extractor for all images of a Misskey user"""
|
|
subcategory = "user"
|
|
pattern = BASE_PATTERN + r"/@([^/?#]+)/?$"
|
|
example = "https://misskey.io/@USER"
|
|
|
|
def items(self):
|
|
base = f"{self.root}/@{self.groups[-1]}/"
|
|
return self._dispatch_extractors((
|
|
(MisskeyInfoExtractor , base + "info"),
|
|
(MisskeyAvatarExtractor , base + "avatar"),
|
|
(MisskeyBackgroundExtractor, base + "banner"),
|
|
(MisskeyNotesExtractor , base + "notes"),
|
|
), ("notes",))
|
|
|
|
|
|
class MisskeyNotesExtractor(MisskeyExtractor):
|
|
"""Extractor for a Misskey user's notes"""
|
|
subcategory = "notes"
|
|
pattern = BASE_PATTERN + r"/@([^/?#]+)/notes"
|
|
example = "https://misskey.io/@USER/notes"
|
|
|
|
def notes(self):
|
|
return self.api.users_notes(self.api.user_id_by_username(
|
|
self.groups[-1]))
|
|
|
|
|
|
class MisskeyInfoExtractor(MisskeyExtractor):
|
|
"""Extractor for a Misskey user's profile data"""
|
|
subcategory = "info"
|
|
pattern = BASE_PATTERN + r"/@([^/?#]+)/info"
|
|
example = "https://misskey.io/@USER/info"
|
|
|
|
def items(self):
|
|
user = self.api.users_show(self.groups[-1])
|
|
return iter(((Message.Directory, "", user),))
|
|
|
|
|
|
class MisskeyAvatarExtractor(MisskeyExtractor):
|
|
"""Extractor for a Misskey user's avatar"""
|
|
subcategory = "avatar"
|
|
pattern = BASE_PATTERN + r"/@([^/?#]+)/avatar"
|
|
example = "https://misskey.io/@USER/avatar"
|
|
|
|
def notes(self):
|
|
user = self.api.users_show(self.groups[-1])
|
|
url = user.get("avatarUrl")
|
|
return (self._make_note("avatar", user, url),) if url else ()
|
|
|
|
|
|
class MisskeyBackgroundExtractor(MisskeyExtractor):
|
|
"""Extractor for a Misskey user's banner image"""
|
|
subcategory = "background"
|
|
pattern = BASE_PATTERN + r"/@([^/?#]+)/ba(?:nner|ckground)"
|
|
example = "https://misskey.io/@USER/banner"
|
|
|
|
def notes(self):
|
|
user = self.api.users_show(self.groups[-1])
|
|
url = user.get("bannerUrl")
|
|
return (self._make_note("background", user, url),) if url else ()
|
|
|
|
|
|
class MisskeyFollowingExtractor(MisskeyExtractor):
|
|
"""Extractor for followed Misskey users"""
|
|
subcategory = "following"
|
|
pattern = BASE_PATTERN + r"/@([^/?#]+)/following"
|
|
example = "https://misskey.io/@USER/following"
|
|
|
|
def items(self):
|
|
user_id = self.api.user_id_by_username(self.groups[-1])
|
|
for user in self.api.users_following(user_id):
|
|
user = user["followee"]
|
|
url = f"{self.root}/@{user['username']}"
|
|
if (host := user["host"]) is not None:
|
|
url = f"{url}@{host}"
|
|
user["_extractor"] = MisskeyUserExtractor
|
|
yield Message.Queue, url, user
|
|
|
|
|
|
class MisskeyNoteExtractor(MisskeyExtractor):
|
|
"""Extractor for images from a Note"""
|
|
subcategory = "note"
|
|
pattern = BASE_PATTERN + r"/notes/(\w+)"
|
|
example = "https://misskey.io/notes/98765"
|
|
|
|
def notes(self):
|
|
return (self.api.notes_show(self.groups[-1]),)
|
|
|
|
|
|
class MisskeyFavoriteExtractor(MisskeyExtractor):
|
|
"""Extractor for favorited notes"""
|
|
subcategory = "favorite"
|
|
pattern = BASE_PATTERN + r"/(?:my|api/i)/favorites"
|
|
example = "https://misskey.io/my/favorites"
|
|
|
|
def notes(self):
|
|
return self.api.i_favorites()
|
|
|
|
|
|
class MisskeyAPI():
|
|
"""Interface for Misskey API
|
|
|
|
https://github.com/misskey-dev/misskey
|
|
https://misskey-hub.net/en/docs/api/
|
|
https://misskey-hub.net/docs/api/endpoints.html
|
|
"""
|
|
|
|
def __init__(self, extractor):
|
|
self.root = extractor.root
|
|
self.extractor = extractor
|
|
self.access_token = extractor.config("access-token")
|
|
|
|
def user_id_by_username(self, username):
|
|
return self.users_show(username)["id"]
|
|
|
|
def users_following(self, user_id):
|
|
endpoint = "/users/following"
|
|
data = {"userId": user_id}
|
|
return self._pagination(endpoint, data)
|
|
|
|
def users_notes(self, user_id):
|
|
endpoint = "/users/notes"
|
|
data = {"userId": user_id}
|
|
return self._pagination(endpoint, data)
|
|
|
|
@memcache(keyarg=1)
|
|
def users_show(self, username):
|
|
endpoint = "/users/show"
|
|
username, _, host = username.partition("@")
|
|
data = {"username": username, "host": host or None}
|
|
return self._call(endpoint, data)
|
|
|
|
def notes_show(self, note_id):
|
|
endpoint = "/notes/show"
|
|
data = {"noteId": note_id}
|
|
return self._call(endpoint, data)
|
|
|
|
def i_favorites(self):
|
|
endpoint = "/i/favorites"
|
|
if not self.access_token:
|
|
raise self.exc.AuthenticationError()
|
|
data = {"i": self.access_token}
|
|
return self._pagination(endpoint, data)
|
|
|
|
def _call(self, endpoint, data):
|
|
url = f"{self.root}/api{endpoint}"
|
|
return self.extractor.request_json(url, method="POST", json=data)
|
|
|
|
def _pagination(self, endpoint, data):
|
|
extr = self.extractor
|
|
data["limit"] = 100
|
|
data["withRenotes"] = extr.renotes
|
|
data["withFiles"] = False if extr.config("text-posts") else True
|
|
|
|
date_min, date_max = extr._get_date_min_max()
|
|
if (order := extr.config("order-posts")) and \
|
|
order[0] in {"a", "r"}:
|
|
key = "sinceId"
|
|
data["sinceDate"] = 1 if date_min is None else date_min * 1000
|
|
date_stop = None if date_max is None else date_max
|
|
else:
|
|
key = "untilId"
|
|
date_stop = None
|
|
if date_min is not None:
|
|
data["sinceDate"] = date_min * 1000
|
|
if date_max is None:
|
|
# ensure notes are returned in descending order
|
|
data["untilDate"] = (int(dt.time.time()) + 1000) * 1000
|
|
if date_max is not None:
|
|
data["untilDate"] = date_max * 1000
|
|
|
|
while True:
|
|
notes = self._call(endpoint, data)
|
|
if not notes:
|
|
return
|
|
elif date_stop is not None and dt.to_ts(dt.parse_iso(
|
|
notes[-1]["createdAt"])) > date_stop:
|
|
for idx, note in enumerate(notes):
|
|
if dt.to_ts(dt.parse_iso(note["createdAt"])) > date_stop:
|
|
yield from notes[:idx]
|
|
return
|
|
else:
|
|
yield from notes
|
|
|
|
data[key] = notes[-1]["id"]
|