[common] add 'request_location()' convenience function
This commit is contained in:
@@ -240,6 +240,11 @@ class Extractor():
|
||||
|
||||
raise exception.HttpError(msg, response)
|
||||
|
||||
def request_location(self, url, **kwargs):
|
||||
kwargs.setdefault("method", "HEAD")
|
||||
kwargs.setdefault("allow_redirects", False)
|
||||
return self.request(url, **kwargs).headers.get("location", "")
|
||||
|
||||
_handle_429 = util.false
|
||||
|
||||
def wait(self, seconds=None, until=None, adjust=1.0,
|
||||
|
||||
@@ -296,8 +296,7 @@ class FanboxExtractor(Extractor):
|
||||
url = "https://www.pixiv.net/fanbox/"+content_id
|
||||
# resolve redirect
|
||||
try:
|
||||
url = self.request(url, method="HEAD",
|
||||
allow_redirects=False).headers["location"]
|
||||
url = self.request_location(url)
|
||||
except Exception as exc:
|
||||
url = None
|
||||
self.log.warning("Unable to extract fanbox embed %s (%s: %s)",
|
||||
@@ -392,13 +391,7 @@ class FanboxRedirectExtractor(Extractor):
|
||||
pattern = r"(?:https?://)?(?:www\.)?pixiv\.net/fanbox/creator/(\d+)"
|
||||
example = "https://www.pixiv.net/fanbox/creator/12345"
|
||||
|
||||
def __init__(self, match):
|
||||
Extractor.__init__(self, match)
|
||||
self.user_id = match.group(1)
|
||||
|
||||
def items(self):
|
||||
url = "https://www.pixiv.net/fanbox/creator/" + self.user_id
|
||||
data = {"_extractor": FanboxCreatorExtractor}
|
||||
response = self.request(
|
||||
url, method="HEAD", allow_redirects=False, notfound="user")
|
||||
yield Message.Queue, response.headers["Location"], data
|
||||
url = "https://www.pixiv.net/fanbox/creator/" + self.groups[0]
|
||||
location = self.request_location(url, notfound="user")
|
||||
yield Message.Queue, location, {"_extractor": FanboxCreatorExtractor}
|
||||
|
||||
@@ -109,11 +109,7 @@ class PatreonExtractor(Extractor):
|
||||
|
||||
def _attachments(self, post):
|
||||
for attachment in post.get("attachments") or ():
|
||||
url = self.request(
|
||||
attachment["url"], method="HEAD",
|
||||
allow_redirects=False, fatal=False,
|
||||
).headers.get("Location")
|
||||
|
||||
url = self.request_location(attachment["url"], fatal=False)
|
||||
if url:
|
||||
yield "attachment", url, attachment["name"]
|
||||
|
||||
|
||||
@@ -380,15 +380,10 @@ class PinterestPinitExtractor(PinterestExtractor):
|
||||
pattern = r"(?:https?://)?pin\.it/([^/?#]+)"
|
||||
example = "https://pin.it/abcde"
|
||||
|
||||
def __init__(self, match):
|
||||
PinterestExtractor.__init__(self, match)
|
||||
self.shortened_id = match.group(1)
|
||||
|
||||
def items(self):
|
||||
url = "https://api.pinterest.com/url_shortener/{}/redirect/".format(
|
||||
self.shortened_id)
|
||||
response = self.request(url, method="HEAD", allow_redirects=False)
|
||||
location = response.headers.get("Location")
|
||||
self.groups[0])
|
||||
location = self.request_location(url)
|
||||
if not location or not PinterestPinExtractor.pattern.match(location):
|
||||
raise exception.NotFoundError("pin")
|
||||
yield Message.Queue, location, {"_extractor": PinterestPinExtractor}
|
||||
|
||||
@@ -516,16 +516,10 @@ class PixivMeExtractor(PixivExtractor):
|
||||
pattern = r"(?:https?://)?pixiv\.me/([^/?#]+)"
|
||||
example = "https://pixiv.me/USER"
|
||||
|
||||
def __init__(self, match):
|
||||
PixivExtractor.__init__(self, match)
|
||||
self.account = match.group(1)
|
||||
|
||||
def items(self):
|
||||
url = "https://pixiv.me/" + self.account
|
||||
data = {"_extractor": PixivUserExtractor}
|
||||
response = self.request(
|
||||
url, method="HEAD", allow_redirects=False, notfound="user")
|
||||
yield Message.Queue, response.headers["Location"], data
|
||||
url = "https://pixiv.me/" + self.groups[0]
|
||||
location = self.request_location(url, notfound="user")
|
||||
yield Message.Queue, location, {"_extractor": PixivUserExtractor}
|
||||
|
||||
|
||||
class PixivWorkExtractor(PixivExtractor):
|
||||
|
||||
@@ -153,17 +153,13 @@ class PostmillPostExtractor(PostmillExtractor):
|
||||
class PostmillShortURLExtractor(PostmillExtractor):
|
||||
"""Extractor for short submission URLs"""
|
||||
subcategory = "shorturl"
|
||||
pattern = BASE_PATTERN + r"/(\d+)$"
|
||||
pattern = BASE_PATTERN + r"(/\d+)$"
|
||||
example = "https://raddle.me/123"
|
||||
|
||||
def __init__(self, match):
|
||||
PostmillExtractor.__init__(self, match)
|
||||
self.post_id = match.group(3)
|
||||
|
||||
def items(self):
|
||||
url = self.root + "/" + self.post_id
|
||||
response = self.request(url, method="HEAD", allow_redirects=False)
|
||||
full_url = text.urljoin(url, response.headers["Location"])
|
||||
url = self.root + self.groups[2]
|
||||
location = self.request_location(url)
|
||||
full_url = text.urljoin(url, location)
|
||||
yield Message.Queue, full_url, {"_extractor": PostmillPostExtractor}
|
||||
|
||||
|
||||
|
||||
@@ -357,10 +357,9 @@ class RedditRedirectExtractor(Extractor):
|
||||
sub_type = "user"
|
||||
url = "https://www.reddit.com/{}/{}/s/{}".format(
|
||||
sub_type, subreddit, share_url)
|
||||
location = self.request_location(url, notfound="submission")
|
||||
data = {"_extractor": RedditSubmissionExtractor}
|
||||
response = self.request(url, method="HEAD", allow_redirects=False,
|
||||
notfound="submission")
|
||||
yield Message.Queue, response.headers["Location"], data
|
||||
yield Message.Queue, location, data
|
||||
|
||||
|
||||
class RedditAPI():
|
||||
|
||||
@@ -43,9 +43,7 @@ class SeigaExtractor(Extractor):
|
||||
def get_image_url(self, image_id):
|
||||
"""Get url for an image with id 'image_id'"""
|
||||
url = "{}/image/source/{}".format(self.root, image_id)
|
||||
response = self.request(
|
||||
url, method="HEAD", allow_redirects=False, notfound="image")
|
||||
location = response.headers["location"]
|
||||
location = self.request_location(url, notfound="image")
|
||||
if "nicovideo.jp/login" in location:
|
||||
raise exception.StopExtraction(
|
||||
"HTTP redirect to login page (%s)", location.partition("?")[0])
|
||||
|
||||
@@ -183,10 +183,7 @@ class TiktokVmpostExtractor(TiktokExtractor):
|
||||
url = text.ensure_http_scheme(self.url)
|
||||
headers = {"User-Agent": "facebookexternalhit/1.1"}
|
||||
|
||||
response = self.request(url, headers=headers, method="HEAD",
|
||||
allow_redirects=False, notfound="post")
|
||||
|
||||
url = response.headers.get("Location")
|
||||
url = self.request_location(url, headers=headers, notfound="post")
|
||||
if not url or len(url) <= 28:
|
||||
# https://www.tiktok.com/?_r=1
|
||||
raise exception.NotFoundError("post")
|
||||
|
||||
@@ -32,21 +32,13 @@ BASE_PATTERN = UrlshortenerExtractor.update({
|
||||
class UrlshortenerLinkExtractor(UrlshortenerExtractor):
|
||||
"""Extractor for general-purpose URL shorteners"""
|
||||
subcategory = "link"
|
||||
pattern = BASE_PATTERN + r"/([^/?#]+)"
|
||||
pattern = BASE_PATTERN + r"(/[^/?#]+)"
|
||||
example = "https://bit.ly/abcde"
|
||||
|
||||
def __init__(self, match):
|
||||
UrlshortenerExtractor.__init__(self, match)
|
||||
self.id = match.group(match.lastindex)
|
||||
|
||||
def _init(self):
|
||||
self.headers = self.config_instance("headers")
|
||||
|
||||
def items(self):
|
||||
response = self.request(
|
||||
"{}/{}".format(self.root, self.id), headers=self.headers,
|
||||
method="HEAD", allow_redirects=False, notfound="URL")
|
||||
try:
|
||||
yield Message.Queue, response.headers["location"], {}
|
||||
except KeyError:
|
||||
url = self.root + self.groups[-1]
|
||||
location = self.request_location(
|
||||
url, headers=self.config_instance("headers"), notfound="URL")
|
||||
if not location:
|
||||
raise exception.StopExtraction("Unable to resolve short URL")
|
||||
yield Message.Queue, location, {}
|
||||
|
||||
Reference in New Issue
Block a user