[kemonoparty] Support /posts endpoint and Creator Tag Calls

- Adding support for calling a creator with a tag selected.
    It is using a legacy endpoint but there is no other way currently
    documented to get the users post filtered by a tag.
- Fixing the User Tags feature to be paginated
    offset is not defined in the API but it is supported.
- Fixed the `/posts` endpoint not working:
    1. Added check along with metadata to make sure there is a
       creator/service information as that is a requirement
    2. Fixed the parameter from tags -> tag.
    3. Fixed the _paginate call to exit correctly when there is
       a key required for the data (it was prematurely exiting)
- Adding a type of caching mechanism for the metadata/user information.
    The current logic would work just fine if looking up for a
    singular user, however for the multiple posts via normal
    filtering would cause it to either:
    This builds a local cache during the process so it should
    only make a call for the user info once during the process.
- Updating to meet standards
    Fixes
      1. Reset formatting for unnecessary line changes
      2. Removed Type Hinting
      3.Replaced f-string with "".format
   Updates
     Renamed function creator_posts_tags -> creator_tagged_posts
     for clarity of what it does (get posts tags vs get tagged posts)
- Fixing check for the length of response:
    1. If it is list - just check len
    2. If there is a key - check that the key length is less
       than the batch.
- add test for '?tag=...' user URLs
    plus some code simplifications
This commit is contained in:
BishopRed
2025-01-14 16:02:58 -06:00
committed by Mike Fährmann
parent 6e3f51a05e
commit b11434a069
2 changed files with 48 additions and 20 deletions

View File

@@ -54,26 +54,19 @@ class KemonopartyExtractor(Extractor):
sort_keys=True, separators=(",", ":")).encode
def items(self):
service = self.groups[2]
creator_id = self.groups[3]
find_hash = re.compile(HASH_PATTERN).match
generators = self._build_file_generators(self.config("files"))
announcements = True if self.config("announcements") else None
comments = True if self.config("comments") else False
duplicates = True if self.config("duplicates") else False
dms = True if self.config("dms") else None
profile = username = None
max_posts = self.config("max-posts")
creator_info = {} if self.config("metadata") else None
# prevent files from being sent with gzip compression
headers = {"Accept-Encoding": "identity"}
if self.config("metadata"):
profile = self.api.creator_profile(service, creator_id)
username = profile["name"]
posts = self.posts()
max_posts = self.config("max-posts")
if max_posts:
posts = itertools.islice(posts, max_posts)
if self.revisions:
@@ -85,10 +78,20 @@ class KemonopartyExtractor(Extractor):
post["_http_headers"] = headers
post["date"] = self._parse_datetime(
post.get("published") or post.get("added") or "")
service = post["service"]
creator_id = post["user"]
if creator_info is not None:
key = "{}_{}".format(service, creator_id)
if key not in creator_info:
creator = creator_info[key] = self.api.creator_profile(
service, creator_id)
else:
creator = creator_info[key]
post["user_profile"] = creator
post["username"] = creator["name"]
if profile is not None:
post["username"] = username
post["user_profile"] = profile
if comments:
try:
post["comments"] = self.api.creator_post_comments(
@@ -171,7 +174,7 @@ class KemonopartyExtractor(Extractor):
try:
msg = '"' + response.json()["error"] + '"'
except Exception:
msg = '"0/1 Username or password is incorrect"'
msg = '"Username or password is incorrect"'
raise exception.AuthenticationError(msg)
return {c.name: c.value for c in response.cookies}
@@ -296,8 +299,12 @@ class KemonopartyUserExtractor(KemonopartyExtractor):
def posts(self):
_, _, service, creator_id, query = self.groups
params = text.parse_query(query)
return self.api.creator_posts(
service, creator_id, params.get("o"), params.get("q"))
if params.get("tag"):
return self.api.creator_tagged_posts(
service, creator_id, params.get("tag"), params.get("o"))
else:
return self.api.creator_posts(
service, creator_id, params.get("o"), params.get("q"))
class KemonopartyPostsExtractor(KemonopartyExtractor):
@@ -493,7 +500,7 @@ class KemonoAPI():
def posts(self, offset=0, query=None, tags=None):
endpoint = "/posts"
params = {"q": query, "o": offset, "tags": tags}
params = {"q": query, "o": offset, "tag": tags}
return self._pagination(endpoint, params, 50, "posts")
def creator_posts(self, service, creator_id, offset=0, query=None):
@@ -501,6 +508,11 @@ class KemonoAPI():
params = {"q": query, "o": offset}
return self._pagination(endpoint, params, 50)
def creator_tagged_posts(self, service, creator_id, tags, offset=0):
endpoint = "/{}/user/{}/posts-legacy".format(service, creator_id)
params = {"o": offset, "tag": tags}
return self._pagination(endpoint, params, 50, "results")
def creator_announcements(self, service, creator_id):
endpoint = "/{}/user/{}/announcements".format(service, creator_id)
return self._call(endpoint)
@@ -565,9 +577,10 @@ class KemonoAPI():
data = self._call(endpoint, params)
if key:
yield from data[key]
else:
yield from data
data = data.get(key)
if not data:
return
yield from data
if len(data) < batch:
return