* [tiktok] always try to resolve JS challenges even if retries is set to 0 * add 1 to tries counter when logging to retain existing logging behavior * clear html data in the case where resolving the challenge worked but extracting the rehydration data afterward did not
1553 lines
60 KiB
Python
1553 lines
60 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
|
|
"""Extractors for https://www.tiktok.com/"""
|
|
|
|
from .common import Extractor, Message, Dispatch
|
|
from .. import text, util, ytdl, exception
|
|
import functools
|
|
import itertools
|
|
import binascii
|
|
import hashlib
|
|
import random
|
|
import time
|
|
|
|
BASE_PATTERN = r"(?:https?://)?(?:www\.)?tiktokv?\.com"
|
|
USER_PATTERN = BASE_PATTERN + r"/@([\w_.-]+)"
|
|
|
|
|
|
class TiktokExtractor(Extractor):
|
|
"""Base class for TikTok extractors"""
|
|
category = "tiktok"
|
|
directory_fmt = ("{category}", "{user}")
|
|
filename_fmt = (
|
|
"{id}{num:?_//>02} {title[b:150]}{file_id:? [/]/}.{extension}")
|
|
archive_fmt = "{id}_{num}_{file_id}"
|
|
root = "https://www.tiktok.com"
|
|
cookies_domain = ".tiktok.com"
|
|
rehydration_data_cache = {}
|
|
rehydration_data_app_context_cache = {}
|
|
|
|
def _init(self):
|
|
self.photo = self.config("photos", True)
|
|
self.audio = self.config("audio", True)
|
|
self.video = self.config("videos", True)
|
|
self.cover = self.config("covers", False)
|
|
self.subtitles = self.config("subtitles", False)
|
|
|
|
self.range = self.config("tiktok-range") or ""
|
|
self.range_predicate = util.predicate_range_parse(self.range)
|
|
|
|
# If one of these fields is None, the filter for it is disabled.
|
|
# Therefore, if both fields are none, all subtitles are extracted.
|
|
self.subtitle_sources = None
|
|
self.subtitle_langs = None
|
|
|
|
if self.subtitles and self.subtitles != "all":
|
|
if self.subtitles is True or not isinstance(self.subtitles, str):
|
|
self.subtitles = "ASR"
|
|
|
|
known_sources = {"ASR", "MT", "LC"}
|
|
filters = set(self.subtitles.split(","))
|
|
self.subtitle_sources = known_sources.intersection(filters) or None
|
|
self.subtitle_langs = filters.difference(known_sources) or None
|
|
|
|
def items(self):
|
|
for tiktok_url in self.posts():
|
|
try:
|
|
tiktok_url = self._sanitize_url(tiktok_url)
|
|
|
|
data = self._extract_rehydration_data(tiktok_url)
|
|
if "webapp.video-detail" not in data:
|
|
# Only /video/ links result in the video-detail dict we
|
|
# need. Try again using that form of link.
|
|
tiktok_url = self._sanitize_url(
|
|
data["seo.abtest"]["canonical"])
|
|
data = self._extract_rehydration_data(tiktok_url)
|
|
video_detail = data["webapp.video-detail"]
|
|
if not self._check_status_code(
|
|
video_detail, tiktok_url, "post"):
|
|
continue
|
|
post = video_detail["itemInfo"]["itemStruct"]
|
|
|
|
post["user"] = \
|
|
(a := post.get("author")) and a["uniqueId"] or ""
|
|
post["date"] = self.parse_timestamp(post["createTime"])
|
|
post["post_type"] = "image" if "imagePost" in post else "video"
|
|
original_title = title = post["desc"]
|
|
|
|
yield Message.Directory, "", post
|
|
ytdl_media = False
|
|
|
|
if "imagePost" in post:
|
|
if self.photo:
|
|
if not original_title:
|
|
title = f"TikTok photo #{post['id']}"
|
|
img_list = post["imagePost"]["images"]
|
|
for i, img in enumerate(img_list, 1):
|
|
url = img["imageURL"]["urlList"][0]
|
|
text.nameext_from_url(url, post)
|
|
post.update({
|
|
"type" : "image",
|
|
"image" : img,
|
|
"title" : title,
|
|
"num" : i,
|
|
"file_id": post["filename"].partition("~")[0],
|
|
"width" : img["imageWidth"],
|
|
"height" : img["imageHeight"],
|
|
})
|
|
yield Message.Url, url, post
|
|
|
|
if self.audio and "music" in post:
|
|
if self.audio == "ytdl":
|
|
ytdl_media = "audio"
|
|
elif url := self._extract_audio(post):
|
|
yield Message.Url, url, post
|
|
|
|
elif "video" in post:
|
|
if self.video == "ytdl":
|
|
ytdl_media = "video"
|
|
elif self.video and (url := self._extract_video(post)):
|
|
yield Message.Url, url, post
|
|
del post["_fallback"]
|
|
|
|
if self.cover:
|
|
for url in self._extract_covers(post, "video"):
|
|
yield Message.Url, url, post
|
|
if self.cover != "all":
|
|
break
|
|
|
|
if self.subtitles:
|
|
for url in self._extract_subtitles(post, "video"):
|
|
yield Message.Url, url, post
|
|
|
|
# remove the subtitle related fields for the next item
|
|
post.pop("subtitle_lang_id", None)
|
|
post.pop("subtitle_lang_codename", None)
|
|
post.pop("subtitle_format", None)
|
|
post.pop("subtitle_version", None)
|
|
post.pop("subtitle_source", None)
|
|
else:
|
|
self.log.info("%s: Skipping post", tiktok_url)
|
|
|
|
if ytdl_media:
|
|
if not original_title:
|
|
title = f"TikTok {ytdl_media} #{post['id']}"
|
|
post.update({
|
|
"type" : ytdl_media,
|
|
"image" : None,
|
|
"filename" : "",
|
|
"extension" :
|
|
"mp3" if ytdl_media == "audio" else "mp4",
|
|
"title" : title,
|
|
"num" : 0,
|
|
"file_id" : "",
|
|
"width" : 0,
|
|
"height" : 0,
|
|
})
|
|
yield Message.Url, "ytdl:" + tiktok_url, post
|
|
except Exception as exc:
|
|
self.log.traceback(exc)
|
|
self.log.error("%s: Failed to extract post (%s: %s)",
|
|
tiktok_url, exc.__class__.__name__, exc)
|
|
|
|
def _sanitize_url(self, url):
|
|
return text.ensure_http_scheme(url.replace("/photo/", "/video/", 1))
|
|
|
|
def _extract_rehydration_data(self, url, additional_keys=[], *,
|
|
has_keys=[]):
|
|
tries = 0
|
|
html = None
|
|
challenge_attempt = False
|
|
while True:
|
|
try:
|
|
response = self.request(url)
|
|
if response.history and "/login" in response.url:
|
|
raise exception.AuthorizationError(
|
|
"HTTP redirect to login page "
|
|
f"('{response.url.partition('?')[0]}')")
|
|
html = response.text
|
|
data = text.extr(
|
|
html, '<script id="__UNIVERSAL_DATA_FOR_REHYDRATION__" '
|
|
'type="application/json">', '</script>')
|
|
data = util.json_loads(data)["__DEFAULT_SCOPE__"]
|
|
for key in additional_keys:
|
|
data = data[key]
|
|
for assert_key in has_keys:
|
|
if assert_key not in data:
|
|
raise KeyError(assert_key)
|
|
return data
|
|
except (ValueError, KeyError):
|
|
# Even if the retries option has been set to 0, we should
|
|
# always at least try to solve the JS challenge and go again
|
|
# immediately.
|
|
if not challenge_attempt:
|
|
challenge_attempt = True
|
|
self.log.info("Solving JavaScript challenge")
|
|
try:
|
|
self._solve_challenge(html)
|
|
html = None
|
|
continue
|
|
except Exception as exc:
|
|
self.log.traceback(exc)
|
|
self.log.warning(
|
|
"%s: Failed to solve JavaScript challenge. If you "
|
|
"keep encountering this issue, please try again "
|
|
"with the --write-pages option and include the "
|
|
"resulting page in your bug report",
|
|
url.rpartition("/")[2])
|
|
|
|
# We've already tried resolving the challenge, and either
|
|
# resolving it failed, or resolving it didn't get us the
|
|
# rehydration data, so fail this attempt.
|
|
self.log.warning("%s: Failed to retrieve rehydration data "
|
|
"(%s/%s)", url.rpartition("/")[2], tries + 1,
|
|
self._retries)
|
|
if tries >= self._retries:
|
|
raise
|
|
tries += 1
|
|
self.sleep(self._timeout, "retry")
|
|
challenge_attempt = False
|
|
html = None
|
|
|
|
def _extract_rehydration_data_user(self, profile_url, additional_keys=()):
|
|
if profile_url in self.rehydration_data_cache:
|
|
data = self.rehydration_data_cache[profile_url]
|
|
else:
|
|
data = self._extract_rehydration_data(
|
|
profile_url,
|
|
has_keys=["webapp.user-detail", "webapp.app-context"]
|
|
)
|
|
self.rehydration_data_cache[profile_url] = \
|
|
data["webapp.user-detail"]
|
|
self.rehydration_data_app_context_cache = \
|
|
data["webapp.app-context"]
|
|
data = data["webapp.user-detail"]
|
|
if not self._check_status_code(data, profile_url, "profile"):
|
|
raise exception.ExtractionError(
|
|
f"{profile_url}: could not extract rehydration data")
|
|
try:
|
|
for key in additional_keys:
|
|
data = data[key]
|
|
except KeyError as exc:
|
|
self.log.traceback(exc)
|
|
raise exception.ExtractionError(
|
|
"%s: could not extract rehydration data (%s)",
|
|
profile_url, ", ".join(additional_keys))
|
|
return data
|
|
|
|
def _ensure_rehydration_data_app_context_cache_is_populated(self):
|
|
if not self.rehydration_data_app_context_cache:
|
|
self.rehydration_data_app_context_cache = \
|
|
self._extract_rehydration_data(
|
|
"https://www.tiktok.com/", ["webapp.app-context"])
|
|
|
|
def _solve_challenge(self, html):
|
|
cs = text.extr(text.extr(html, 'id="cs"', '>'), 'class="', '"')
|
|
c = util.json_loads(binascii.a2b_base64(cs + "==").decode())
|
|
|
|
# find index of expected digest
|
|
expected = binascii.a2b_base64(c["v"]["c"] + "==")
|
|
base = hashlib.sha256(binascii.a2b_base64(c["v"]["a"] + "=="))
|
|
for idx in range(1_000_000):
|
|
test = base.copy()
|
|
test.update(str(idx).encode())
|
|
if test.digest() == expected:
|
|
break
|
|
else:
|
|
raise exception.ExtractionError("failed to find matching digest")
|
|
|
|
# extract cookie names
|
|
wci = text.extr(text.extr(html, 'id="wci"', '>'), 'class="', '"')
|
|
rci = text.extr(text.extr(html, 'id="rci"', '>'), 'class="', '"')
|
|
rs = text.extr(text.extr(html, 'id="rs"', '>'), 'class="', '"')
|
|
|
|
# set cookie values
|
|
domain = self.cookies_domain
|
|
expires = int(time.time()) + 5
|
|
c["d"] = binascii.b2a_base64(str(idx).encode(), newline=False).decode()
|
|
v = binascii.b2a_base64(util.json_dumps(c).encode(), newline=False)
|
|
self.cookies.set(wci, v.decode(), domain=domain, expires=expires)
|
|
if rs:
|
|
self.cookies.set(rci, rs, domain=domain, expires=expires)
|
|
|
|
def _extract_sec_uid(self, profile_url, user_name):
|
|
sec_uid = self._extract_id(
|
|
profile_url, user_name, r"MS4wLjABAAAA[\w-]{64}", "secUid")
|
|
if sec_uid is None:
|
|
raise exception.AbortExtraction(
|
|
f"{user_name}: unable to extract secondary user ID")
|
|
return sec_uid
|
|
|
|
def _extract_author_id(self, profile_url, user_name):
|
|
author_id = self._extract_id(
|
|
profile_url, user_name, r"[0-9]+", "id")
|
|
if author_id is None:
|
|
raise exception.AbortExtraction(
|
|
f"{user_name}: unable to extract user ID")
|
|
return author_id
|
|
|
|
def _extract_id(self, profile_url, user_name, regex, id_key):
|
|
match = text.re(regex).fullmatch
|
|
|
|
if match(user_name) is not None:
|
|
# If it was provided in the URL, then we can skip extracting it
|
|
# from the rehydration data.
|
|
return user_name
|
|
|
|
id = self._extract_rehydration_data_user(
|
|
profile_url, ("userInfo", "user", id_key))
|
|
return None if match(id) is None else id
|
|
|
|
def _extract_video(self, post):
|
|
video = post["video"]
|
|
urls = self._extract_video_urls(video)
|
|
if not urls:
|
|
raise exception.ExtractionError(
|
|
f"{post['id']}: Failed to extract video URLs. "
|
|
f"You may need cookies to continue.")
|
|
|
|
url = urls[0]
|
|
text.nameext_from_url(url, post)
|
|
post.update({
|
|
"_fallback": urls[1:],
|
|
"type" : "video",
|
|
"image" : None,
|
|
"title" : post["desc"] or f"TikTok video #{post['id']}",
|
|
"duration" : video.get("duration"),
|
|
"num" : 0,
|
|
"file_id" : "",
|
|
"width" : video.get("width"),
|
|
"height" : video.get("height"),
|
|
})
|
|
if not post["extension"]:
|
|
post["extension"] = video.get("format", "mp4")
|
|
return url
|
|
|
|
def _extract_video_urls(self, video):
|
|
# First, look for bitrateInfo.
|
|
# This will include URLs pointing to the best quality videos.
|
|
if "bitrateInfo" in video:
|
|
bitrate_info = video["bitrateInfo"]
|
|
if not isinstance(bitrate_info, list):
|
|
bitrate_info = [bitrate_info]
|
|
bitrate_urls = {}
|
|
for video_info in bitrate_info:
|
|
play_addr = video_info["PlayAddr"]
|
|
width = text.parse_int(play_addr.get("Width"))
|
|
height = text.parse_int(play_addr.get("Height"))
|
|
size = width * height
|
|
if size in bitrate_urls:
|
|
bitrate_urls[size] += play_addr.get("UrlList")
|
|
else:
|
|
bitrate_urls[size] = play_addr.get("UrlList").copy()
|
|
# Sort the URLs by descending quality.
|
|
sizes = list(bitrate_urls)
|
|
sizes.sort(reverse=True)
|
|
urls = [url for size in sizes for url in bitrate_urls[size]]
|
|
else:
|
|
urls = []
|
|
|
|
# As a fallback, try to look for the root playAddr,
|
|
# which won't necessarily point to the best quality.
|
|
if "playAddr" in video:
|
|
urls.append(video["playAddr"])
|
|
|
|
return urls
|
|
|
|
def _extract_audio(self, post):
|
|
audio = post["music"]
|
|
url = audio["playUrl"]
|
|
text.nameext_from_url(url, post)
|
|
post.update({
|
|
"type" : "audio",
|
|
"image" : None,
|
|
"title" : post["desc"] or f"TikTok audio #{post['id']}",
|
|
"duration" : audio.get("duration"),
|
|
"num" : 0,
|
|
"file_id" : audio.get("id"),
|
|
"width" : 0,
|
|
"height" : 0,
|
|
})
|
|
if not post["extension"]:
|
|
post["extension"] = "mp3"
|
|
return url
|
|
|
|
def _extract_covers(self, post, type):
|
|
media = post[type]
|
|
|
|
for cover_id in ("thumbnail", "cover", "originCover", "dynamicCover"):
|
|
if url := media.get(cover_id):
|
|
text.nameext_from_url(url, post)
|
|
post.update({
|
|
"type" : "cover",
|
|
"extension": "jpg",
|
|
"image" : url,
|
|
"title" : post["desc"] or
|
|
f"TikTok {type} cover #{post['id']}",
|
|
"duration" : media.get("duration"),
|
|
"num" : 0,
|
|
"file_id" : cover_id,
|
|
"width" : 0,
|
|
"height" : 0,
|
|
})
|
|
yield url
|
|
|
|
def _extract_subtitles(self, post, type):
|
|
media = post[type]
|
|
sources_filtered = self.subtitle_sources is not None
|
|
langs_filtered = self.subtitle_langs is not None
|
|
|
|
for subtitle in media.get("subtitleInfos", ()):
|
|
sub_lang_id = subtitle.get("LanguageID")
|
|
sub_lang_codename = subtitle.get("LanguageCodeName")
|
|
sub_format = subtitle.get("Format")
|
|
sub_version = subtitle.get("Version")
|
|
sub_source = subtitle.get("Source")
|
|
|
|
# guard the iterable access
|
|
sources_match = sources_filtered and \
|
|
sub_source in self.subtitle_sources
|
|
langs_match = langs_filtered and \
|
|
sub_lang_codename in self.subtitle_langs
|
|
|
|
# Subtitles will be extracted when either filter matches.
|
|
if not sources_match and not langs_match and \
|
|
(sources_filtered or langs_filtered):
|
|
continue
|
|
|
|
if url := subtitle.get("Url"):
|
|
text.nameext_from_url(url, post)
|
|
|
|
# subtitle urls may not specify a filename,
|
|
# so the metadata can be used to build one.
|
|
if not post["filename"]:
|
|
post["filename"] = (f"{post['id']}_{sub_lang_codename}_"
|
|
f"{sub_version}_{sub_source}")
|
|
post["extension"] = sub_format.lower()
|
|
|
|
# replace extensions for known formats
|
|
if post["extension"] == "webvtt":
|
|
post["extension"] = "vtt"
|
|
elif post["extension"] == "creator_caption":
|
|
post["extension"] = "json"
|
|
|
|
post.update({
|
|
"type" : "subtitle",
|
|
"image" : None,
|
|
"title" :
|
|
post["desc"] or
|
|
f"TikTok {type} subtitle #{post['id']}",
|
|
"duration" : media.get("duration"),
|
|
"num" : 0,
|
|
"file_id" :
|
|
f"{sub_lang_id}_{sub_lang_codename}_{sub_source}_"
|
|
f"{sub_version}_{sub_format}",
|
|
"subtitle_lang_id" : sub_lang_id,
|
|
"subtitle_lang_codename": sub_lang_codename,
|
|
"subtitle_format" : sub_format,
|
|
"subtitle_version" : sub_version,
|
|
"subtitle_source" : sub_source,
|
|
"width" : 0,
|
|
"height" : 0,
|
|
})
|
|
yield url
|
|
|
|
def _check_status_code(self, detail, url, type_of_url):
|
|
status = detail.get("statusCode")
|
|
if not status:
|
|
return True
|
|
|
|
if status == 10222:
|
|
# Video count workaround ported from yt-dlp: sometimes TikTok
|
|
# reports a profile as private even though we have the cookies to
|
|
# access it. We know that we can access it if we can see the
|
|
# videos stats. If we can't, we assume that we don't have access
|
|
# to the profile.
|
|
# We only care about this workaround for webapp.user-detail
|
|
# objects, so always fail the workaround for e.g.
|
|
# webapp.video-detail objects.
|
|
video_count = self._extract_video_count_from_user_detail(detail)
|
|
if video_count is None:
|
|
self.log.error("%s: Login required to access this %s", url,
|
|
type_of_url)
|
|
elif video_count > 0:
|
|
return True
|
|
else:
|
|
self.log.error("%s: Login required to access this %s, or this "
|
|
"profile has no videos posted", url,
|
|
type_of_url)
|
|
elif status == 10221:
|
|
self.log.error("%s: User account could not be found", url)
|
|
elif status == 10204:
|
|
self.log.error("%s: Requested %s not available", url, type_of_url)
|
|
elif status == 10231:
|
|
self.log.error("%s: Region locked - Try downloading with a "
|
|
"VPN/proxy connection", url)
|
|
else:
|
|
self.log.error(
|
|
"%s: Received unknown error code %s ('%s')",
|
|
url, status, detail.get("statusMsg") or "")
|
|
return False
|
|
|
|
def _extract_video_count_from_user_detail(self, detail):
|
|
user_info = detail.get("userInfo")
|
|
if not user_info:
|
|
return None
|
|
stats = user_info.get("stats") or user_info.get("statsV2")
|
|
try:
|
|
# stats.videoCount is an int, but statsV2.videoCount is a
|
|
# string, so we must explicitly convert the attribute.
|
|
return int(stats["videoCount"])
|
|
except (KeyError, ValueError):
|
|
return None
|
|
|
|
|
|
class TiktokPostExtractor(TiktokExtractor):
|
|
"""Extract a single video or photo TikTok link"""
|
|
subcategory = "post"
|
|
pattern = BASE_PATTERN + r"/(?:@([\w_.-]*)|share)/(?:phot|vide)o/(\d+)"
|
|
example = "https://www.tiktok.com/@USER/photo/1234567890"
|
|
|
|
def posts(self):
|
|
user, post_id = self.groups
|
|
url = f"{self.root}/@{user or ''}/video/{post_id}"
|
|
return {url: None}
|
|
|
|
|
|
class TiktokVmpostExtractor(TiktokExtractor):
|
|
"""Extract a single video or photo TikTok VM link"""
|
|
subcategory = "vmpost"
|
|
pattern = (r"(?:https?://)?(?:"
|
|
r"(?:v[mt]\.)?tiktok\.com|(?:www\.)?tiktok\.com/t"
|
|
r")/(?!@)([^/?#]+)")
|
|
example = "https://vm.tiktok.com/1a2B3c4E5"
|
|
|
|
def items(self):
|
|
url = text.ensure_http_scheme(self.url)
|
|
headers = {"User-Agent": "facebookexternalhit/1.1"}
|
|
|
|
url = self.request_location(url, headers=headers, notfound="post")
|
|
if not url or len(url) <= 28:
|
|
# https://www.tiktok.com/?_r=1
|
|
raise exception.NotFoundError("post")
|
|
|
|
data = {"_extractor": TiktokPostExtractor}
|
|
yield Message.Queue, url.partition("?")[0], data
|
|
|
|
|
|
class TiktokUserExtractor(Dispatch, TiktokExtractor):
|
|
"""Extractor for a TikTok user profile"""
|
|
pattern = USER_PATTERN + r"/?(?:$|\?|#)"
|
|
example = "https://www.tiktok.com/@USER"
|
|
|
|
def items(self):
|
|
base = f"{self.root}/@{self.groups[0]}/"
|
|
return self._dispatch_extractors((
|
|
(TiktokAvatarExtractor , base + "avatar"),
|
|
(TiktokPostsExtractor , base + "posts"),
|
|
(TiktokRepostsExtractor, base + "reposts"),
|
|
(TiktokStoriesExtractor, base + "stories"),
|
|
(TiktokLikesExtractor , base + "likes"),
|
|
(TiktokSavedExtractor , base + "saved"),
|
|
), ("avatar", "posts"))
|
|
|
|
|
|
class TiktokAvatarExtractor(TiktokExtractor):
|
|
subcategory = "avatar"
|
|
pattern = USER_PATTERN + r"/avatar"
|
|
example = "https://www.tiktok.com/@USER/avatar"
|
|
|
|
def items(self):
|
|
user_name = self.groups[0]
|
|
profile_url = f"{self.root}/@{user_name}"
|
|
|
|
data = self._extract_rehydration_data_user(
|
|
profile_url, ("userInfo", "user"))
|
|
data["user"] = data.get("uniqueId", user_name)
|
|
avatar_url = data.get("avatarLarger") or data.get("avatarMedium") \
|
|
or data["avatarThumb"]
|
|
avatar = text.nameext_from_url(avatar_url, data.copy())
|
|
avatar.update({
|
|
"type" : "avatar",
|
|
"title" : "@" + data["user"],
|
|
"id" : data["id"],
|
|
"file_id": avatar["filename"].partition("~")[0],
|
|
"num" : 0,
|
|
})
|
|
|
|
yield Message.Directory, "", avatar
|
|
yield Message.Url, avatar_url, avatar
|
|
|
|
|
|
class TiktokPostsExtractor(TiktokExtractor):
|
|
subcategory = "posts"
|
|
pattern = USER_PATTERN + r"/posts"
|
|
example = "https://www.tiktok.com/@USER/posts"
|
|
|
|
def posts(self):
|
|
user_name = self.groups[0]
|
|
profile_url = f"{self.root}/@{user_name}"
|
|
self.user_provided_cookies = bool(self.cookies)
|
|
|
|
# If set to "ytdl", we shall first go via yt-dlp. If that fails,
|
|
# we shall attempt to extract directly.
|
|
if self.config("ytdl", False):
|
|
if posts := self._extract_posts_ytdl(profile_url):
|
|
return posts
|
|
ytdl = True
|
|
self.log.warning("Could not extract TikTok user "
|
|
f"{user_name} via yt-dlp or youtube-dl, "
|
|
"attempting the extraction directly")
|
|
else:
|
|
ytdl = False
|
|
|
|
if posts := self._extract_posts_api(profile_url, user_name):
|
|
return posts
|
|
|
|
message = "Could not extract any posts from TikTok user " \
|
|
f"{user_name}"
|
|
if not ytdl:
|
|
message += ", try extracting post information using " \
|
|
"yt-dlp with the -o ytdl=true argument"
|
|
self.log.warning(message)
|
|
return ()
|
|
|
|
def _extract_posts_ytdl(self, profile_url):
|
|
try:
|
|
module = ytdl.import_module(self.config("module"))
|
|
except (ImportError, SyntaxError) as exc:
|
|
self.log.error("Cannot import module '%s'",
|
|
getattr(exc, "name", ""))
|
|
self.log.traceback(exc)
|
|
return []
|
|
|
|
extr_opts = {
|
|
"extract_flat" : True,
|
|
"ignore_no_formats_error": True,
|
|
}
|
|
user_opts = {
|
|
"retries" : self._retries,
|
|
"socket_timeout" : self._timeout,
|
|
"nocheckcertificate" : not self._verify,
|
|
"playlist_items" : str(self.range),
|
|
}
|
|
if self._proxies:
|
|
user_opts["proxy"] = self._proxies.get("http")
|
|
|
|
ytdl_instance = ytdl.construct_YoutubeDL(
|
|
module, self, user_opts, extr_opts)
|
|
|
|
# Transfer cookies to ytdl.
|
|
if self.cookies:
|
|
set_cookie = ytdl_instance.cookiejar.set_cookie
|
|
for cookie in self.cookies:
|
|
set_cookie(cookie)
|
|
|
|
with ytdl_instance as ydl:
|
|
info_dict = ydl._YoutubeDL__extract_info(
|
|
profile_url, ydl.get_info_extractor("TikTokUser"),
|
|
False, {}, True)
|
|
# This should be a list of video and photo post URLs in /video/
|
|
# format.
|
|
return [video["url"].partition("?")[0]
|
|
for video in info_dict["entries"]]
|
|
|
|
def _extract_posts_api(self, profile_url, user_name):
|
|
self.post_order = self.config("order-posts") or "desc"
|
|
if self.post_order not in ["desc", "asc", "reverse", "popular"]:
|
|
self.post_order = "desc"
|
|
sec_uid = self._extract_sec_uid(profile_url, user_name)
|
|
|
|
# If descending order is requested, opt for the more reliable legacy
|
|
# endpoint instead of trying with the "newer", flakier endpoint.
|
|
if self.post_order == "desc":
|
|
return self._extract_posts_api_legacy(
|
|
profile_url, sec_uid, self.range_predicate)
|
|
|
|
if not self.user_provided_cookies:
|
|
self.log.warning(
|
|
"%s: no cookies have been provided so the order-posts "
|
|
"option will not take effect. You must provide cookies in "
|
|
"order to extract a profile's posts in non-descending "
|
|
"order",
|
|
profile_url
|
|
)
|
|
return self._extract_posts_api_legacy(
|
|
profile_url, sec_uid, self.range_predicate)
|
|
|
|
try:
|
|
urls = self._extract_posts_api_order(
|
|
profile_url, sec_uid, self.range_predicate)
|
|
if urls:
|
|
return urls
|
|
except Exception as exc:
|
|
self.log.traceback(exc)
|
|
|
|
self.log.error(
|
|
"%s: failed to extract user posts using post/item_list (make sure "
|
|
"you provide valid cookies). Attempting with legacy "
|
|
"creator/item_list endpoint that does not support post ordering",
|
|
profile_url
|
|
)
|
|
return self._extract_posts_api_legacy(
|
|
profile_url, sec_uid, self.range_predicate)
|
|
|
|
def _extract_posts_api_order(self, profile_url, sec_uid, range_predicate):
|
|
post_item_list_request_type = "0"
|
|
if self.post_order in ["asc", "reverse"]:
|
|
post_item_list_request_type = "2"
|
|
elif self.post_order in ["popular"]:
|
|
post_item_list_request_type = "1"
|
|
query_parameters = {
|
|
"secUid": sec_uid,
|
|
"post_item_list_request_type": post_item_list_request_type,
|
|
"count": "15",
|
|
"needPinnedItemIds": "false",
|
|
}
|
|
request = TiktokPostItemListRequest(range_predicate)
|
|
if not request.execute(self, profile_url, query_parameters):
|
|
return []
|
|
return request.generate_urls(profile_url, self.video, self.photo,
|
|
self.audio)
|
|
|
|
def _extract_posts_api_legacy(self, profile_url, sec_uid, range_predicate):
|
|
query_parameters = {
|
|
"secUid": sec_uid,
|
|
"type": "1",
|
|
"count": "15",
|
|
}
|
|
request = TiktokCreatorItemListRequest(range_predicate)
|
|
request.execute(self, profile_url, query_parameters)
|
|
return request.generate_urls(profile_url, self.video, self.photo,
|
|
self.audio)
|
|
|
|
|
|
class TiktokRepostsExtractor(TiktokExtractor):
|
|
subcategory = "reposts"
|
|
pattern = USER_PATTERN + r"/reposts"
|
|
example = "https://www.tiktok.com/@USER/reposts"
|
|
|
|
def posts(self):
|
|
user_name = self.groups[0]
|
|
profile_url = f"{self.root}/@{user_name}"
|
|
|
|
query_parameters = {
|
|
"secUid": self._extract_sec_uid(profile_url, user_name),
|
|
"post_item_list_request_type": "0",
|
|
"needPinnedItemIds": "false",
|
|
"count": "15",
|
|
}
|
|
request = TiktokRepostItemListRequest(self.range_predicate)
|
|
request.execute(self, profile_url, query_parameters)
|
|
return request.generate_urls(profile_url, self.video, self.photo,
|
|
self.audio)
|
|
|
|
|
|
class TiktokStoriesExtractor(TiktokExtractor):
|
|
subcategory = "stories"
|
|
pattern = USER_PATTERN + r"/stories"
|
|
example = "https://www.tiktok.com/@USER/stories"
|
|
|
|
def posts(self):
|
|
user_name = self.groups[0]
|
|
profile_url = f"{self.root}/@{user_name}"
|
|
|
|
query_parameters = {
|
|
"authorId": self._extract_author_id(profile_url, user_name),
|
|
"loadBackward": "false",
|
|
"count": "5",
|
|
}
|
|
request = TiktokStoryItemListRequest()
|
|
request.execute(self, profile_url, query_parameters)
|
|
return request.generate_urls(profile_url, self.video, self.photo,
|
|
self.audio)
|
|
|
|
|
|
class TiktokLikesExtractor(TiktokExtractor):
|
|
subcategory = "likes"
|
|
pattern = USER_PATTERN + r"/like[sd]"
|
|
example = "https://www.tiktok.com/@USER/liked"
|
|
|
|
def posts(self):
|
|
user_name = self.groups[0]
|
|
profile_url = f"{self.root}/@{user_name}"
|
|
|
|
query_parameters = {
|
|
"secUid": self._extract_sec_uid(profile_url, user_name),
|
|
"post_item_list_request_type": "0",
|
|
"needPinnedItemIds": "false",
|
|
"count": "15",
|
|
}
|
|
request = TiktokFavoriteItemListRequest(self.range_predicate)
|
|
request.execute(self, profile_url, query_parameters)
|
|
return request.generate_urls(profile_url, self.video, self.photo,
|
|
self.audio)
|
|
|
|
|
|
class TiktokSavedExtractor(TiktokExtractor):
|
|
subcategory = "saved"
|
|
pattern = USER_PATTERN + r"/saved"
|
|
example = "https://www.tiktok.com/@USER/saved"
|
|
|
|
def posts(self):
|
|
user_name = self.groups[0]
|
|
profile_url = f"{self.root}/@{user_name}"
|
|
|
|
query_parameters = {
|
|
"secUid": self._extract_sec_uid(profile_url, user_name),
|
|
"post_item_list_request_type": "0",
|
|
"needPinnedItemIds": "false",
|
|
"count": "15",
|
|
}
|
|
request = TiktokSavedPostItemListRequest(self.range_predicate)
|
|
request.execute(self, profile_url, query_parameters)
|
|
return request.generate_urls(profile_url, self.video, self.photo,
|
|
self.audio)
|
|
|
|
|
|
class TiktokFollowingExtractor(TiktokExtractor):
|
|
"""Extract all of the stories of all of the users you follow"""
|
|
subcategory = "following"
|
|
pattern = rf"{BASE_PATTERN}/following"
|
|
example = "https://www.tiktok.com/following"
|
|
|
|
def posts(self):
|
|
"""Attempt to extract all of the stories of all of the accounts
|
|
the user follows"""
|
|
|
|
query_parameters = {
|
|
"storyFeedScene": "3",
|
|
"count": "15",
|
|
}
|
|
request = TiktokStoryUserListRequest()
|
|
if not request.execute(self, self.url, query_parameters):
|
|
self.log.error("%s: could not extract follower list, make sure "
|
|
"you are using logged-in cookies", self.url)
|
|
users = request.generate_urls()
|
|
if len(users) == 0:
|
|
self.log.warning("%s: No followers with stories could be "
|
|
"extracted", self.url)
|
|
|
|
entries = {}
|
|
# Batch all of the users up into groups of at most ten and use the
|
|
# batch endpoint to improve performance. The response to the story user
|
|
# list request may also include the user themselves, so skip them if
|
|
# they ever turn up.
|
|
for b in range((len(users) - 1) // 10 + 1):
|
|
batch_number = b + 1
|
|
user_batch = users[b*10:batch_number*10]
|
|
|
|
# Handle edge case where final batch is composed of a single user
|
|
# and that user is the one we need to skip. If we don't handle this
|
|
# here (or when we generate the author ID list later), we will
|
|
# trigger an AssertionError for an empty author ID list.
|
|
if len(user_batch) == 1:
|
|
if self._is_current_user(user_batch[0][0]):
|
|
continue
|
|
|
|
self.log.info("TikTok user stories, batch %d: %s", batch_number,
|
|
", ".join([profile_url for user_id, profile_url in
|
|
user_batch if not self._is_current_user(
|
|
user_id)]))
|
|
|
|
# Since we've already extracted all of the author IDs, we should be
|
|
# able to avoid having to request rehydration data (except for one
|
|
# time, since it's required to make _is_current_user() work), but
|
|
# we should keep this mechanism in place for safety.
|
|
author_ids = [self._extract_author_id(profile_url, user_id)
|
|
for user_id, profile_url in user_batch
|
|
if not self._is_current_user(user_id)]
|
|
query_parameters = {
|
|
"authorIds": ",".join(author_ids),
|
|
"storyCallScene": "2",
|
|
}
|
|
request = TiktokStoryBatchItemListRequest()
|
|
request.execute(self, f"Batch {batch_number}", query_parameters)
|
|
# We technically don't need to have the correct user name in the
|
|
# URL and it's easier to just ignore it here.
|
|
entries.update(request.generate_urls("https://www.tiktok.com/@_",
|
|
self.video, self.photo,
|
|
self.audio))
|
|
|
|
return entries
|
|
|
|
def _is_current_user(self, user_id):
|
|
self._ensure_rehydration_data_app_context_cache_is_populated()
|
|
if "user" not in self.rehydration_data_app_context_cache:
|
|
return False
|
|
if "uid" not in self.rehydration_data_app_context_cache["user"]:
|
|
return False
|
|
return self.rehydration_data_app_context_cache["user"]["uid"] == \
|
|
user_id
|
|
|
|
|
|
class TiktokPaginationCursor:
|
|
def current_page(self):
|
|
"""Must return the page the cursor is currently pointing to.
|
|
|
|
Returns
|
|
-------
|
|
int
|
|
The current value of the cursor.
|
|
"""
|
|
|
|
return 0
|
|
|
|
def next_page(self, data, query_parameters):
|
|
"""Must progress the cursor to the next page.
|
|
|
|
Parameters
|
|
----------
|
|
data : dict
|
|
The response of the most recent request.
|
|
query_parameters : dict
|
|
All of the query parameters used for the most recent
|
|
request.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
True if the cursor detects that we've reached the end, False
|
|
otherwise.
|
|
"""
|
|
|
|
return True
|
|
|
|
|
|
class TiktokTimeCursor(TiktokPaginationCursor):
|
|
def __init__(self, *, reverse=True, has_more_attribute="hasMore",
|
|
cursor_attribute="cursor"):
|
|
super().__init__()
|
|
self.cursor = 0
|
|
# If we expect the cursor to go up or down as we go to the next page.
|
|
# True for down, False for up.
|
|
self.reverse = reverse
|
|
self.has_more_key = has_more_attribute
|
|
self.cursor_key = cursor_attribute
|
|
|
|
def current_page(self):
|
|
return self.cursor
|
|
|
|
def next_page(self, data, query_parameters):
|
|
skip_fallback_logic = self.cursor == 0
|
|
new_cursor = int(data.get(self.cursor_key, 0))
|
|
no_cursor = not new_cursor
|
|
if not skip_fallback_logic:
|
|
# If the new cursor doesn't go in the direction we expect, use the
|
|
# fallback logic instead.
|
|
if self.reverse and (new_cursor > self.cursor or no_cursor):
|
|
new_cursor = self.fallback_cursor(data)
|
|
elif not self.reverse and (new_cursor < self.cursor or no_cursor):
|
|
new_cursor = self.fallback_cursor(data)
|
|
elif no_cursor:
|
|
raise exception.ExtractionError("Could not extract next cursor")
|
|
self.cursor = new_cursor
|
|
return not data.get(self.has_more_key, False)
|
|
|
|
def fallback_cursor(self, data):
|
|
try:
|
|
return int(data["itemList"][-1]["createTime"]) * 1000
|
|
except Exception:
|
|
return 7 * 86_400_000 * (-1 if self.reverse else 1)
|
|
|
|
|
|
class TiktokForwardTimeCursor(TiktokTimeCursor):
|
|
def __init__(self):
|
|
super().__init__(reverse=False)
|
|
|
|
|
|
class TiktokBackwardTimeCursor(TiktokTimeCursor):
|
|
def __init__(self):
|
|
super().__init__(reverse=True)
|
|
|
|
|
|
class TiktokPopularTimeCursor(TiktokTimeCursor):
|
|
def __init__(self):
|
|
super().__init__(reverse=True)
|
|
|
|
def fallback_cursor(self, data):
|
|
# Don't really know what to do here, all I know is that the cursor
|
|
# for the popular item feed goes down and it does not appear to be
|
|
# based on item list timestamps at all.
|
|
return -50_000
|
|
|
|
|
|
class TiktokStoryTimeCursor(TiktokTimeCursor):
|
|
def __init__(self):
|
|
super().__init__(reverse=False, has_more_attribute="HasMoreAfter",
|
|
cursor_attribute="MaxCursor")
|
|
|
|
|
|
class TiktokLegacyTimeCursor(TiktokPaginationCursor):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.cursor = int(time.time()) * 1000
|
|
|
|
def current_page(self):
|
|
return self.cursor
|
|
|
|
def next_page(self, data, query_parameters):
|
|
old_cursor = self.cursor
|
|
try:
|
|
self.cursor = int(data["itemList"][-1]["createTime"]) * 1000
|
|
except Exception:
|
|
self.cursor = 0
|
|
if not self.cursor or old_cursor == self.cursor:
|
|
# User may not have posted within this ~1 week look back,
|
|
# so manually adjust the cursor.
|
|
self.cursor = old_cursor - 7 * 86_400_000
|
|
# In case 'hasMorePrevious' is wrong, break if we have
|
|
# gone back before TikTok existed.
|
|
has_more_previous = data.get("hasMorePrevious")
|
|
return self.cursor < 1472706000000 or not has_more_previous
|
|
|
|
|
|
class TiktokItemCursor(TiktokPaginationCursor):
|
|
def __init__(self, list_key: str = "itemList"):
|
|
super().__init__()
|
|
self.cursor = 0
|
|
self.list_key = list_key
|
|
|
|
def current_page(self):
|
|
return self.cursor
|
|
|
|
def next_page(self, data, query_parameters):
|
|
# We should offset the cursor by the number of items in the response.
|
|
# Sometimes less items are returned than what was requested in the
|
|
# count parameter! We could fall back onto the count query parameter
|
|
# but we could miss out on some posts.
|
|
self.cursor += len(data.get(self.list_key, ()))
|
|
if "hasMore" in data:
|
|
return not data["hasMore"]
|
|
return not data.get("HasMoreAfter", False)
|
|
|
|
|
|
class TiktokPaginationRequest:
|
|
def __init__(self, endpoint):
|
|
self.endpoint = endpoint
|
|
self._regenerate_device_id()
|
|
self.items = {}
|
|
|
|
def execute(self, extractor, url, query_parameters):
|
|
"""Performs requests until all pages have been retrieved.
|
|
|
|
The items retrieved from this request are stored in self.items.
|
|
Each call to execute() will clear the previous value of
|
|
self.items.
|
|
|
|
Usually extractors want a simple list of URLs. For this, each
|
|
request subtype is to implement generate_urls().
|
|
|
|
Parameters
|
|
----------
|
|
extractor : TiktokExtractor
|
|
The TikTok extractor performing the request.
|
|
url : str
|
|
The URL associated with this request for logging purposes.
|
|
query_parameters : dict[str, str]
|
|
The query parameters to apply to this request.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
True if the request was performed successfully and all items
|
|
were retrieved, False if no items or only some items could
|
|
be retrieved.
|
|
"""
|
|
|
|
self.validate_query_parameters(query_parameters)
|
|
self.items = {}
|
|
cursor_type = self.cursor_type(query_parameters)
|
|
cursor = cursor_type() if cursor_type else None
|
|
for page in itertools.count(start=1):
|
|
item_count = len(self.items)
|
|
extractor.log.info("%s: retrieving %s page %d (%d item%s)", url,
|
|
self.endpoint, page, item_count,
|
|
"" if item_count == 1 else "s")
|
|
tries = 0
|
|
while True:
|
|
try:
|
|
data, final_parameters = self._request_data(
|
|
extractor,
|
|
cursor,
|
|
query_parameters
|
|
)
|
|
incoming_items = self.extract_items(data)
|
|
self._detect_duplicate_pages(extractor, url,
|
|
set(self.items.keys()),
|
|
set(incoming_items.keys()))
|
|
self.items.update(incoming_items)
|
|
if cursor:
|
|
final_page_reached = cursor.next_page(data,
|
|
final_parameters)
|
|
exit_early = self.exit_early(extractor, url)
|
|
if exit_early or final_page_reached:
|
|
return True
|
|
# Continue to next page and reset tries counter.
|
|
break
|
|
else:
|
|
# This request has no cursor: return immediately.
|
|
return True
|
|
except Exception as exc:
|
|
if tries >= extractor._retries:
|
|
extractor.log.error("%s: failed to retrieve %s page "
|
|
"%d", url, self.endpoint, page)
|
|
extractor.log.traceback(exc)
|
|
return False
|
|
tries += 1
|
|
extractor.log.warning("%s: failed to retrieve %s page %d",
|
|
url, self.endpoint, page)
|
|
extractor.sleep(extractor._timeout, "retry")
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
"""Used to validate the given parameters for this type of
|
|
pagination request.
|
|
|
|
For developer purposes only. You should call
|
|
super().validate_query_parameters() for most requests as they
|
|
will usually have a count parameter.
|
|
|
|
Parameters
|
|
----------
|
|
query_parameters : dict[str, str]
|
|
The query parameters to validate.
|
|
|
|
Raises
|
|
-------
|
|
AssertionError
|
|
If mandatory query parameters are not given, or they are
|
|
given in the wrong format.
|
|
"""
|
|
|
|
assert "count" in query_parameters
|
|
assert type(query_parameters["count"]) is str
|
|
assert query_parameters["count"].isdigit()
|
|
assert query_parameters["count"] != "0"
|
|
|
|
def cursor_type(self, query_parameters):
|
|
"""Used to determine which type of cursor to use for this
|
|
request, if any.
|
|
|
|
Parameters
|
|
----------
|
|
query_parameters : dict[str, str]
|
|
The query parameters given to the execute() call.
|
|
|
|
Returns
|
|
-------
|
|
Type[TiktokPaginationCursor] | None
|
|
The type of cursor to use, if any.
|
|
"""
|
|
|
|
return None
|
|
|
|
def extract_items(self, data):
|
|
"""Used to extract data from the response of a request.
|
|
|
|
Parameters
|
|
----------
|
|
data : dict
|
|
The data given by TikTok.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
Each item from the response data, keyed on a unique ID.
|
|
|
|
Raises
|
|
------
|
|
Exception
|
|
If items could not be extracted.
|
|
"""
|
|
|
|
return {}
|
|
|
|
def exit_early(self, extractor, url):
|
|
"""Used to determine if we should exit early from the request.
|
|
|
|
You have access to the items extracted so far (self.items).
|
|
|
|
Parameters
|
|
----------
|
|
extractor : TiktokExtractor
|
|
The extractor making the requests.
|
|
url : str
|
|
The URL associated with the executing request for logging
|
|
purposes.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
True if we should exit early, False otherwise.
|
|
"""
|
|
|
|
return False
|
|
|
|
def generate_urls(self):
|
|
"""Used to convert the items retrieved from the request into a
|
|
list of URLs.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
Ideally one URL for each item, that points to a video detail
|
|
object, although subclasses are permitted to return a list
|
|
or dict of any format they wish.
|
|
"""
|
|
|
|
return []
|
|
|
|
def _regenerate_device_id(self):
|
|
self.device_id = str(random.randint(
|
|
7_250_000_000_000_000_000, 7_325_099_899_999_994_577))
|
|
|
|
def _request_data(self, extractor, cursor, query_parameters):
|
|
# Implement simple 1 retry mechanism without delays that handles the
|
|
# flaky post/item_list endpoint.
|
|
retries = 0
|
|
while True:
|
|
try:
|
|
url, final_parameters = self._build_api_request_url(
|
|
cursor,
|
|
query_parameters
|
|
)
|
|
response = extractor.request(url)
|
|
return (util.json_loads(response.text), final_parameters)
|
|
except ValueError:
|
|
if retries == 1:
|
|
raise
|
|
extractor.log.warning(
|
|
"Could not decode response for this page, trying again"
|
|
)
|
|
retries += 1
|
|
|
|
def _build_api_request_url(self, cursor, extra_parameters):
|
|
query_parameters = {
|
|
"aid": "1988",
|
|
"app_language": "en",
|
|
"app_name": "tiktok_web",
|
|
"browser_language": "en-US",
|
|
"browser_name": "Mozilla",
|
|
"browser_online": "true",
|
|
"browser_platform": "Win32",
|
|
"browser_version": "5.0 (Windows)",
|
|
"channel": "tiktok_web",
|
|
"cookie_enabled": "true",
|
|
"device_id": self.device_id,
|
|
"device_platform": "web_pc",
|
|
"focus_state": "true",
|
|
"from_page": "user",
|
|
"history_len": "2",
|
|
"is_fullscreen": "false",
|
|
"is_page_visible": "true",
|
|
"language": "en",
|
|
"os": "windows",
|
|
"priority_region": "",
|
|
"referer": "",
|
|
"region": "US",
|
|
"screen_height": "1080",
|
|
"screen_width": "1920",
|
|
"tz_name": "UTC",
|
|
"verifyFp": "verify_" + "".join(random.choices(
|
|
"0123456789abcdef", k=7)),
|
|
"webcast_language": "en",
|
|
}
|
|
if cursor:
|
|
# We must not write this as a floating-point number:
|
|
query_parameters["cursor"] = str(int(cursor.current_page()))
|
|
for key, value in extra_parameters.items():
|
|
query_parameters[key] = f"{value}"
|
|
query_str = text.build_query(query_parameters)
|
|
return (f"https://www.tiktok.com/api/{self.endpoint}/?{query_str}",
|
|
query_parameters)
|
|
|
|
def _detect_duplicate_pages(self, extractor, url, seen_ids, incoming_ids):
|
|
if incoming_ids and incoming_ids == seen_ids:
|
|
# TikTok API keeps sending the same page, likely due to
|
|
# a bad device ID. Generate a new one and try again.
|
|
self._regenerate_device_id()
|
|
extractor.log.warning("%s: TikTok API keeps sending the same "
|
|
"page. Taking measures to avoid an infinite "
|
|
"loop", url)
|
|
raise exception.ExtractionError(
|
|
"TikTok API keeps sending the same page")
|
|
|
|
|
|
class TiktokItemListRequest(TiktokPaginationRequest):
|
|
def __init__(self, endpoint, type_of_items, range_predicate):
|
|
super().__init__(endpoint)
|
|
self.type_of_items = type_of_items
|
|
self.range_predicate = range_predicate
|
|
self.exit_early_due_to_no_items = False
|
|
|
|
def extract_items(self, data):
|
|
if "itemList" not in data:
|
|
if not data.get("hasMorePrevious", data.get("hasMore", False)):
|
|
self.exit_early_due_to_no_items = True
|
|
return {}
|
|
return {item["id"]: item for item in data["itemList"]}
|
|
|
|
def exit_early(self, extractor, url):
|
|
if self.exit_early_due_to_no_items:
|
|
extractor.log.warning("%s: could not extract any %s for this user",
|
|
url, self.type_of_items)
|
|
return True
|
|
if not self.range_predicate:
|
|
# No range predicates given.
|
|
return False
|
|
# If our current selection of items can't satisfy the upper bound of
|
|
# the predicate, we must continue extracting them until we can.
|
|
return len(self.items) > max(r.stop for r in self.range_predicate) - 1
|
|
|
|
def generate_urls(self, profile_url, video, photo, audio):
|
|
urls = {}
|
|
for index, id in enumerate(self.items.keys()):
|
|
if not self._matches_filters(self.items.get(id), index + 1, video,
|
|
photo, audio):
|
|
continue
|
|
# Try to grab the author's unique ID, but don't cause the
|
|
# extraction to fail if we can't, it's not imperative that the
|
|
# URLs include the actual poster's unique ID.
|
|
try:
|
|
url = f"https://www.tiktok.com/@" \
|
|
f"{self.items[id]['author']['uniqueId']}/video/{id}"
|
|
except KeyError:
|
|
# Use the given profile URL as a back up.
|
|
url = f"{profile_url}/video/{id}"
|
|
urls[url] = self.items.get(id)
|
|
return urls
|
|
|
|
def _matches_filters(self, item, index, video, photo, audio):
|
|
# First, check if this index falls within any of our configured ranges.
|
|
# If it doesn't, we filter it out.
|
|
if self.range_predicate:
|
|
range_match = False
|
|
for range in self.range_predicate:
|
|
if index in range:
|
|
range_match = True
|
|
break
|
|
if not range_match:
|
|
return False
|
|
|
|
# Then, we apply basic video/photo filtering.
|
|
if not item:
|
|
return True
|
|
is_image_post = "imagePost" in item
|
|
if not photo and not audio and is_image_post:
|
|
return False
|
|
if not video and not is_image_post:
|
|
return False
|
|
return True
|
|
|
|
|
|
class TiktokCreatorItemListRequest(TiktokItemListRequest):
|
|
"""A less flaky version of the post/item_list endpoint that doesn't
|
|
support latest/popular/oldest ordering."""
|
|
|
|
def __init__(self, range_predicate):
|
|
super().__init__("creator/item_list", "posts", range_predicate)
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
super().validate_query_parameters(query_parameters)
|
|
assert "secUid" in query_parameters
|
|
assert "type" in query_parameters
|
|
# Pagination type: 0 == oldest-to-newest, 1 == newest-to-oldest.
|
|
# NOTE: ^ this type parameter doesn't seem to do what yt-dlp thinks it
|
|
# does. post/item_list is the only way to get an ordered feed
|
|
# based on latest/popular/oldest.
|
|
assert query_parameters["type"] == "0" or \
|
|
query_parameters["type"] == "1"
|
|
|
|
def cursor_type(self, query_parameters):
|
|
return TiktokLegacyTimeCursor
|
|
|
|
|
|
class TiktokPostItemListRequest(TiktokItemListRequest):
|
|
"""Retrieves posts in latest/popular/oldest ordering.
|
|
|
|
Very often, this request will just return an empty response, making
|
|
it quite flaky, but the next attempt to make the request usually
|
|
does return a response. For this reason creator/item_list was kept
|
|
as a backup, though it doesn't seem to support ordering.
|
|
|
|
It also doesn't work without cookies.
|
|
"""
|
|
|
|
def __init__(self, range_predicate):
|
|
super().__init__("post/item_list", "posts", range_predicate)
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
super().validate_query_parameters(query_parameters)
|
|
assert "secUid" in query_parameters
|
|
assert "post_item_list_request_type" in query_parameters
|
|
# Pagination type:
|
|
# 0 == newest-to-oldest.
|
|
# 1 == popular.
|
|
# 2 == oldest-to-newest.
|
|
assert query_parameters["post_item_list_request_type"] in \
|
|
["0", "1", "2"]
|
|
assert "needPinnedItemIds" in query_parameters
|
|
# If this value is set to "true", and "post_item_list_request_type" is
|
|
# set to "0", pinned posts will always show up first in the resulting
|
|
# itemList. It keeps our logic simpler if we avoid this behavior by
|
|
# setting this parameter to "false" (especially if we were to use a
|
|
# really small "count" value like "1" or "2").
|
|
assert query_parameters["needPinnedItemIds"] in ["false"]
|
|
|
|
def cursor_type(self, query_parameters):
|
|
request_type = query_parameters["post_item_list_request_type"]
|
|
if request_type == "2":
|
|
return TiktokForwardTimeCursor
|
|
elif request_type == "1":
|
|
return TiktokPopularTimeCursor
|
|
else:
|
|
return TiktokBackwardTimeCursor
|
|
|
|
|
|
class TiktokFavoriteItemListRequest(TiktokItemListRequest):
|
|
"""Retrieves a user's liked posts.
|
|
|
|
Appears to only support descending order, but it can work without
|
|
cookies.
|
|
"""
|
|
|
|
def __init__(self, range_predicate):
|
|
super().__init__("favorite/item_list", "liked posts", range_predicate)
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
super().validate_query_parameters(query_parameters)
|
|
assert "secUid" in query_parameters
|
|
assert "post_item_list_request_type" in query_parameters
|
|
assert query_parameters["post_item_list_request_type"] == "0"
|
|
assert "needPinnedItemIds" in query_parameters
|
|
assert query_parameters["needPinnedItemIds"] in ["false"]
|
|
|
|
def cursor_type(self, query_parameters):
|
|
return TiktokPopularTimeCursor
|
|
|
|
|
|
class TiktokRepostItemListRequest(TiktokItemListRequest):
|
|
"""Retrieves a user's reposts.
|
|
|
|
Appears to only support descending order, but it can work without
|
|
cookies.
|
|
"""
|
|
|
|
def __init__(self, range_predicate):
|
|
super().__init__("repost/item_list", "reposts", range_predicate)
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
super().validate_query_parameters(query_parameters)
|
|
assert "secUid" in query_parameters
|
|
assert "post_item_list_request_type" in query_parameters
|
|
assert query_parameters["post_item_list_request_type"] == "0"
|
|
assert "needPinnedItemIds" in query_parameters
|
|
assert query_parameters["needPinnedItemIds"] in ["false"]
|
|
|
|
def cursor_type(self, query_parameters):
|
|
return TiktokItemCursor
|
|
|
|
|
|
class TiktokSavedPostItemListRequest(TiktokItemListRequest):
|
|
"""Retrieves a user's saved posts.
|
|
|
|
Appears to only support descending order, but it can work without
|
|
cookies.
|
|
"""
|
|
|
|
def __init__(self, range_predicate):
|
|
super().__init__("user/collect/item_list", "saved posts",
|
|
range_predicate)
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
super().validate_query_parameters(query_parameters)
|
|
assert "secUid" in query_parameters
|
|
assert "post_item_list_request_type" in query_parameters
|
|
assert query_parameters["post_item_list_request_type"] == "0"
|
|
assert "needPinnedItemIds" in query_parameters
|
|
assert query_parameters["needPinnedItemIds"] in ["false"]
|
|
|
|
def cursor_type(self, query_parameters):
|
|
return TiktokPopularTimeCursor
|
|
|
|
|
|
class TiktokStoryItemListRequest(TiktokItemListRequest):
|
|
def __init__(self):
|
|
super().__init__("story/item_list", "stories", None)
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
super().validate_query_parameters(query_parameters)
|
|
assert "authorId" in query_parameters
|
|
assert "loadBackward" in query_parameters
|
|
assert query_parameters["loadBackward"] in ["true", "false"]
|
|
|
|
def cursor_type(self, query_parameters):
|
|
return TiktokStoryTimeCursor
|
|
|
|
|
|
class TiktokStoryBatchItemListRequest(TiktokItemListRequest):
|
|
def __init__(self):
|
|
super().__init__("story/batch/item_list", "stories", None)
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
# This request type does not need a count parameter so don't invoke
|
|
# super().validate_query_parameters().
|
|
assert "authorIds" in query_parameters
|
|
# I'd recommend between 1-10 users at a time, as that's what I see in
|
|
# the webapp.
|
|
author_count = query_parameters["authorIds"].count(",") + 1
|
|
assert author_count >= 1 and author_count <= 10
|
|
# Not sure what this parameter does.
|
|
assert "storyCallScene" in query_parameters
|
|
assert query_parameters["storyCallScene"] == "2"
|
|
|
|
def extract_items(self, data):
|
|
# We need to extract each itemList within the response and combine each
|
|
# of them into a single list of items. If even one of the users doesn't
|
|
# have an item list, "exit early," but continue to gather the rest
|
|
# (this request doesn't use a cursor anyway so there is no concept of
|
|
# exiting early).
|
|
items = {}
|
|
if type(data.get("batchStoryItemLists")) is not list:
|
|
self.exit_early_due_to_no_items = True
|
|
return items
|
|
for userStories in data["batchStoryItemLists"]:
|
|
items.update(super().extract_items(userStories))
|
|
return items
|
|
|
|
|
|
class TiktokStoryUserListRequest(TiktokPaginationRequest):
|
|
def __init__(self):
|
|
super().__init__("story/user_list")
|
|
self.exit_early_due_to_no_cookies = False
|
|
|
|
def validate_query_parameters(self, query_parameters):
|
|
super().validate_query_parameters(query_parameters)
|
|
# Not sure what this parameter does.
|
|
assert "storyFeedScene" in query_parameters
|
|
assert query_parameters["storyFeedScene"] == "3"
|
|
|
|
def cursor_type(self, query_parameters):
|
|
return functools.partial(TiktokItemCursor, "storyUsers")
|
|
|
|
def extract_items(self, data):
|
|
if "storyUsers" not in data:
|
|
self.exit_early_due_to_no_cookies = True
|
|
return {}
|
|
return {item["user"]["id"]: item["user"]["uniqueId"]
|
|
for item in data["storyUsers"]}
|
|
|
|
def exit_early(self, extractor, url):
|
|
if self.exit_early_due_to_no_cookies:
|
|
extractor.log.error("You must provide cookies to extract the "
|
|
"stories of your following list")
|
|
return self.exit_early_due_to_no_cookies
|
|
|
|
def generate_urls(self):
|
|
return [(id, f"https://www.tiktok.com/@{name}")
|
|
for id, name in self.items.items()]
|