[instagram] update

- reorder some functions and extractors
- add missing GraphQL endpoints
- fix some GraphQL bugs
This commit is contained in:
Mike Fährmann
2022-09-27 14:17:36 +02:00
parent aa49bf13d2
commit 2c67bee5c4

View File

@@ -131,86 +131,8 @@ class InstagramExtractor(Extractor):
self.session.cookies.set(
"csrftoken", self.csrf_token, domain=self.cookiedomain)
def _parse_post_graphql(self, post):
typename = post["__typename"]
if post.get("is_video") and "video_url" not in post:
post = self.api.media(post["id"])
elif typename == "GraphSidecar" and \
"edge_sidecar_to_children" not in post:
post = self.api.media(post["id"])
pinned = post.get("pinned_for_users", ())
if pinned:
for index, user in enumerate(pinned):
pinned[index] = int(user["id"])
owner = post["owner"]
data = {
"typename" : typename,
"date" : text.parse_timestamp(post["taken_at_timestamp"]),
"likes" : post["edge_media_preview_like"]["count"],
"pinned" : pinned,
"owner_id" : owner["id"],
"username" : owner.get("username"),
"fullname" : owner.get("full_name"),
"post_id" : post["id"],
"post_shortcode": post["shortcode"],
"post_url" : "{}/p/{}/".format(self.root, post["shortcode"]),
"description": text.parse_unicode_escapes("\n".join(
edge["node"]["text"]
for edge in post["edge_media_to_caption"]["edges"]
)),
}
tags = self._find_tags(data["description"])
if tags:
data["tags"] = sorted(set(tags))
location = post.get("location")
if location:
data["location_id"] = location["id"]
data["location_slug"] = location["slug"]
data["location_url"] = "{}/explore/locations/{}/{}/".format(
self.root, location["id"], location["slug"])
data["_files"] = files = []
if "edge_sidecar_to_children" in post:
for num, edge in enumerate(
post["edge_sidecar_to_children"]["edges"], 1):
node = edge["node"]
dimensions = node["dimensions"]
media = {
"num": num,
"media_id" : node["id"],
"shortcode" : (node.get("shortcode") or
shortcode_from_id(node["id"])),
"display_url": node["display_url"],
"video_url" : node.get("video_url"),
"width" : dimensions["width"],
"height" : dimensions["height"],
"sidecar_media_id" : post["id"],
"sidecar_shortcode": post["shortcode"],
}
self._extract_tagged_users(node, media)
files.append(media)
else:
dimensions = post["dimensions"]
media = {
"media_id" : post["id"],
"shortcode" : post["shortcode"],
"display_url": post["display_url"],
"video_url" : post.get("video_url"),
"width" : dimensions["width"],
"height" : dimensions["height"],
}
self._extract_tagged_users(post, media)
files.append(media)
return data
def _parse_post_rest(self, post):
if "items" in post:
if "items" in post: # story or highlight
items = post["items"]
reel_id = str(post["id"]).rpartition(":")[2]
data = {
@@ -224,7 +146,7 @@ class InstagramExtractor(Extractor):
if "created_at" in post:
data["date"] = text.parse_timestamp(post.get("created_at"))
else:
else: # regular image/video post
data = {
"post_id" : post["pk"],
"post_shortcode": post["code"],
@@ -298,6 +220,85 @@ class InstagramExtractor(Extractor):
return data
def _parse_post_graphql(self, post):
typename = post["__typename"]
if self._logged_in:
if post.get("is_video") and "video_url" not in post:
post = self.api.media(post["id"])[0]
elif typename == "GraphSidecar" and \
"edge_sidecar_to_children" not in post:
post = self.api.media(post["id"])[0]
pinned = post.get("pinned_for_users", ())
if pinned:
for index, user in enumerate(pinned):
pinned[index] = int(user["id"])
owner = post["owner"]
data = {
"typename" : typename,
"date" : text.parse_timestamp(post["taken_at_timestamp"]),
"likes" : post["edge_media_preview_like"]["count"],
"pinned" : pinned,
"owner_id" : owner["id"],
"username" : owner.get("username"),
"fullname" : owner.get("full_name"),
"post_id" : post["id"],
"post_shortcode": post["shortcode"],
"post_url" : "{}/p/{}/".format(self.root, post["shortcode"]),
"description": text.parse_unicode_escapes("\n".join(
edge["node"]["text"]
for edge in post["edge_media_to_caption"]["edges"]
)),
}
tags = self._find_tags(data["description"])
if tags:
data["tags"] = sorted(set(tags))
location = post.get("location")
if location:
data["location_id"] = location["id"]
data["location_slug"] = location["slug"]
data["location_url"] = "{}/explore/locations/{}/{}/".format(
self.root, location["id"], location["slug"])
data["_files"] = files = []
if "edge_sidecar_to_children" in post:
for num, edge in enumerate(
post["edge_sidecar_to_children"]["edges"], 1):
node = edge["node"]
dimensions = node["dimensions"]
media = {
"num": num,
"media_id" : node["id"],
"shortcode" : (node.get("shortcode") or
shortcode_from_id(node["id"])),
"display_url": node["display_url"],
"video_url" : node.get("video_url"),
"width" : dimensions["width"],
"height" : dimensions["height"],
"sidecar_media_id" : post["id"],
"sidecar_shortcode": post["shortcode"],
}
self._extract_tagged_users(node, media)
files.append(media)
else:
dimensions = post["dimensions"]
media = {
"media_id" : post["id"],
"shortcode" : post["shortcode"],
"display_url": post["display_url"],
"video_url" : post.get("video_url"),
"width" : dimensions["width"],
"height" : dimensions["height"],
}
self._extract_tagged_users(post, media)
files.append(media)
return data
@staticmethod
def _extract_tagged_users(src, dest):
dest["tagged_users"] = tagged_users = []
@@ -355,13 +356,13 @@ class InstagramUserExtractor(InstagramExtractor):
(InstagramHighlightsExtractor, base + "highlights/"),
(InstagramPostsExtractor , base + "posts/"),
(InstagramReelsExtractor , base + "reels/"),
(InstagramChannelExtractor , base + "channel/"),
(InstagramTaggedExtractor , base + "tagged/"),
(InstagramChannelExtractor , base + "channel/"),
), ("posts",))
class InstagramPostsExtractor(InstagramExtractor):
"""Extractor for ProfilePage posts"""
"""Extractor for an Instagram user's posts"""
subcategory = "posts"
pattern = USER_PATTERN + r"/posts"
test = ("https://www.instagram.com/instagram/posts/", {
@@ -374,8 +375,22 @@ class InstagramPostsExtractor(InstagramExtractor):
return self.api.user_feed(uid)
class InstagramReelsExtractor(InstagramExtractor):
"""Extractor for an Instagram user's reels"""
subcategory = "reels"
pattern = USER_PATTERN + r"/reels"
test = ("https://www.instagram.com/instagram/reels/", {
"range": "40-60",
"count": ">= 20",
})
def posts(self):
uid = self.api.user_id(self.item)
return self.api.user_clips(uid)
class InstagramTaggedExtractor(InstagramExtractor):
"""Extractor for ProfilePage tagged posts"""
"""Extractor for an Instagram user's tagged posts"""
subcategory = "tagged"
pattern = USER_PATTERN + r"/tagged"
test = ("https://www.instagram.com/instagram/tagged/", {
@@ -407,7 +422,7 @@ class InstagramTaggedExtractor(InstagramExtractor):
class InstagramChannelExtractor(InstagramExtractor):
"""Extractor for ProfilePage channel"""
"""Extractor for an Instagram user's channel posts"""
subcategory = "channel"
pattern = USER_PATTERN + r"/channel"
test = ("https://www.instagram.com/instagram/channel/", {
@@ -421,7 +436,7 @@ class InstagramChannelExtractor(InstagramExtractor):
class InstagramSavedExtractor(InstagramExtractor):
"""Extractor for ProfilePage saved media"""
"""Extractor for an Instagram user's saved media"""
subcategory = "saved"
pattern = USER_PATTERN + r"/saved(?:/all-posts)?/?$"
test = (
@@ -434,7 +449,7 @@ class InstagramSavedExtractor(InstagramExtractor):
class InstagramCollectionExtractor(InstagramExtractor):
"""Extractor for ProfilePage saved collection media"""
"""Extractor for Instagram collection"""
subcategory = "collection"
pattern = USER_PATTERN + r"/saved/([^/?#]+)/([^/?#]+)"
test = (
@@ -455,8 +470,56 @@ class InstagramCollectionExtractor(InstagramExtractor):
return self.api.user_collection(self.collection_id)
class InstagramStoriesExtractor(InstagramExtractor):
"""Extractor for Instagram stories"""
subcategory = "stories"
pattern = (r"(?:https?://)?(?:www\.)?instagram\.com"
r"/stories/(?:highlights/(\d+)|([^/?#]+)(?:/(\d+))?)")
test = (
("https://www.instagram.com/stories/instagram/"),
("https://www.instagram.com/stories/highlights/18042509488170095/"),
("https://instagram.com/stories/geekmig/2724343156064789461"),
)
def __init__(self, match):
self.highlight_id, self.user, self.media_id = match.groups()
if self.highlight_id:
self.subcategory = InstagramHighlightsExtractor.subcategory
InstagramExtractor.__init__(self, match)
def posts(self):
if self.highlight_id:
reel_id = "highlight:" + self.highlight_id
else:
reel_id = self.api.user_id(self.user)
reels = self.api.reels_media(reel_id)
if self.media_id and reels:
reel = reels[0]
for item in reel["items"]:
if item["pk"] == self.media_id:
reel["items"] = (item,)
break
else:
raise exception.NotFoundError("story")
return reels
class InstagramHighlightsExtractor(InstagramExtractor):
"""Extractor for an Instagram user's story highlights"""
subcategory = "highlights"
pattern = USER_PATTERN + r"/highlights"
test = ("https://www.instagram.com/instagram/highlights",)
def posts(self):
uid = self.api.user_id(self.item)
return self.api.highlights_media(uid)
class InstagramTagExtractor(InstagramExtractor):
"""Extractor for TagPage"""
"""Extractor for Instagram tags"""
subcategory = "tag"
directory_fmt = ("{category}", "{subcategory}", "{tag}")
pattern = BASE_PATTERN + r"/explore/tags/([^/?#]+)"
@@ -502,7 +565,6 @@ class InstagramPostExtractor(InstagramExtractor):
"width": int,
}
}),
# GraphSidecar
("https://www.instagram.com/p/BoHk1haB5tM/", {
"count": 5,
@@ -517,7 +579,6 @@ class InstagramPostExtractor(InstagramExtractor):
"username": "instagram",
}
}),
# GraphVideo
("https://www.instagram.com/p/Bqxp0VSBgJg/", {
"pattern": r"/46840863_726311431074534_7805566102611403091_n\.mp4",
@@ -535,7 +596,6 @@ class InstagramPostExtractor(InstagramExtractor):
"width": int,
}
}),
# GraphVideo (IGTV)
("https://www.instagram.com/tv/BkQjCfsBIzi/", {
"pattern": r"/10000000_597132547321814_702169244961988209_n\.mp4",
@@ -552,7 +612,6 @@ class InstagramPostExtractor(InstagramExtractor):
"width": int,
}
}),
# GraphSidecar with 2 embedded GraphVideo objects
("https://www.instagram.com/p/BtOvDOfhvRr/", {
"count": 2,
@@ -563,7 +622,6 @@ class InstagramPostExtractor(InstagramExtractor):
"video_url": str,
}
}),
# GraphImage with tagged user
("https://www.instagram.com/p/B_2lf3qAd3y/", {
"keyword": {
@@ -574,10 +632,8 @@ class InstagramPostExtractor(InstagramExtractor):
}]
}
}),
# URL with username (#2085)
("https://www.instagram.com/dm/p/CW042g7B9CY/"),
("https://www.instagram.com/reel/CDg_6Y1pxWu/"),
)
@@ -585,68 +641,6 @@ class InstagramPostExtractor(InstagramExtractor):
return self.api.media(id_from_shortcode(self.item))
class InstagramStoriesExtractor(InstagramExtractor):
"""Extractor for Instagram stories"""
subcategory = "stories"
pattern = (r"(?:https?://)?(?:www\.)?instagram\.com"
r"/stories/(?:highlights/(\d+)|([^/?#]+)(?:/(\d+))?)")
test = (
("https://www.instagram.com/stories/instagram/"),
("https://www.instagram.com/stories/highlights/18042509488170095/"),
("https://instagram.com/stories/geekmig/2724343156064789461"),
)
def __init__(self, match):
self.highlight_id, self.user, self.media_id = match.groups()
if self.highlight_id:
self.subcategory = InstagramHighlightsExtractor.subcategory
InstagramExtractor.__init__(self, match)
def posts(self):
if self.highlight_id:
reel_id = "highlight:" + self.highlight_id
else:
reel_id = self.api.user_id(self.user)
reels = self.api.reels_media(reel_id)
if self.media_id and reels:
reel = reels[0]
for item in reel["items"]:
if item["pk"] == self.media_id:
reel["items"] = (item,)
break
else:
raise exception.NotFoundError("story")
return reels
class InstagramHighlightsExtractor(InstagramExtractor):
"""Extractor for all Instagram story highlights of a user"""
subcategory = "highlights"
pattern = USER_PATTERN + r"/highlights"
test = ("https://www.instagram.com/instagram/highlights",)
def posts(self):
uid = self.api.user_id(self.item)
return self.api.highlights_media(uid)
class InstagramReelsExtractor(InstagramExtractor):
"""Extractor for an Instagram user's reels"""
subcategory = "reels"
pattern = USER_PATTERN + r"/reels"
test = ("https://www.instagram.com/instagram/reels/", {
"range": "40-60",
"count": ">= 20",
})
def posts(self):
uid = self.api.user_id(self.item)
return self.api.user_clips(uid)
class InstagramRestAPI():
def __init__(self, extractor):
@@ -755,7 +749,7 @@ class InstagramRestAPI():
else:
yield from data["items"]
if not data["more_available"]:
if not data.get("more_available"):
return
params["max_id"] = data["next_max_id"]
@@ -820,7 +814,14 @@ class InstagramGraphqlAPI():
"parent_comment_count": 24,
"has_threaded_comments": True,
}
return (self._call(query_hash, variables)["shortcode_media"],)
media = self._call(query_hash, variables).get("shortcode_media")
return (media,) if media else ()
def tags_media(self, tag):
query_hash = "9b498c08113f1e09617a1703c22b2f32"
variables = {"tag_name": text.unescape(tag), "first": 50}
return self._pagination(query_hash, variables,
"hashtag", "edge_hashtag_to_media")
def user_id(self, screen_name):
if screen_name.startswith("id:"):
@@ -837,6 +838,11 @@ class InstagramGraphqlAPI():
variables = {"id": user_id, "first": 50}
return self._pagination(query_hash, variables)
def user_tagged(self, user_id):
query_hash = "be13233562af2d229b008d2976b998b5"
variables = {"id": user_id, "first": 50}
return self._pagination(query_hash, variables)
def _call(self, query_hash, variables):
extr = self.extractor
@@ -862,14 +868,14 @@ class InstagramGraphqlAPI():
url, params=params, headers=headers, cookies=cookies,
).json()["data"]
def _pagination(self, query_hash, variables):
def _pagination(self, query_hash, variables, key="user", edge=None):
cursor = self.extractor.config("cursor")
if cursor:
variables["after"] = cursor
while True:
data = next(iter(self._call(
query_hash, variables)["user"].values()))
data = self._call(query_hash, variables)[key]
data = data[edge] if edge else next(iter(data.values()))
for edge in data["edges"]:
yield edge["node"]