[urlshortener] update

This commit is contained in:
Mike Fährmann
2023-04-15 18:06:06 +02:00
parent 875485313f
commit 5e63942b37
2 changed files with 43 additions and 34 deletions

View File

@@ -1276,13 +1276,13 @@ Consider all sites to be NSFW unless otherwise known.
<tr>
<td>Bitly</td>
<td>https://bit.ly/</td>
<td></td>
<td>Links</td>
<td></td>
</tr>
<tr>
<td>Twitter t.co</td>
<td>https://t.co/</td>
<td></td>
<td>Links</td>
<td></td>
</tr>

View File

@@ -4,43 +4,15 @@
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractor for general-purpose URL shorteners"""
"""Extractors for general-purpose URL shorteners"""
from .common import BaseExtractor, Message
from .. import exception
class UrlshortenerExtractor(BaseExtractor):
"""Extractor for general-purpose URL shorteners"""
"""Base class for URL shortener extractors"""
basecategory = "urlshortener"
test = (
("https://bit.ly/3cWIUgq", {
"count": 1,
"pattern": "^https://gumroad.com/l/storm_b1"
}),
("https://t.co/bCgBY8Iv5n", {
"count": 1,
"pattern": ("^https://twitter.com/elonmusk/status/"
"1421395561324896257/photo/1")
}),
)
def __init__(self, match):
BaseExtractor.__init__(self, match)
self.headers = INSTANCES[self.category].get("headers")
self.id = match.group(match.lastindex)
def request(self, url, **kwargs):
kwargs["headers"] = self.headers
return BaseExtractor.request(self, url, **kwargs)
def items(self):
response = self.request(
"{}/{}".format(self.root, self.id), method="HEAD",
allow_redirects=False, notfound="URL")
if "location" not in response.headers:
raise exception.StopExtraction("Unable to resolve short URL")
yield Message.Queue, response.headers["location"], {}
INSTANCES = {
@@ -56,5 +28,42 @@ INSTANCES = {
},
}
UrlshortenerExtractor.pattern = \
UrlshortenerExtractor.update(INSTANCES) + r"/([^/?#&]+)"
BASE_PATTERN = UrlshortenerExtractor.update(INSTANCES)
class UrlshortenerLinkExtractor(UrlshortenerExtractor):
"""Extractor for general-purpose URL shorteners"""
subcategory = "link"
pattern = BASE_PATTERN + r"/([^/?&#]+)"
test = (
("https://bit.ly/3cWIUgq", {
"count": 1,
"pattern": "^https://gumroad.com/l/storm_b1",
}),
("https://t.co/bCgBY8Iv5n", {
"count": 1,
"pattern": "^https://twitter.com/elonmusk/status/"
"1421395561324896257/photo/1",
}),
("https://t.co/abcdefghij", {
"exception": exception.NotFoundError,
}),
)
def __init__(self, match):
UrlshortenerExtractor.__init__(self, match)
self.id = match.group(match.lastindex)
try:
self.headers = INSTANCES[self.category]["headers"]
except Exception:
self.headers = None
def items(self):
response = self.request(
"{}/{}".format(self.root, self.id), headers=self.headers,
method="HEAD", allow_redirects=False, notfound="URL")
try:
yield Message.Queue, response.headers["location"], {}
except KeyError:
raise exception.StopExtraction("Unable to resolve short URL")