[instagram] fix and update extractors (#2644)

- use different way to fetch user IDs
- use new API endpoints for /tagged/ and single posts
This commit is contained in:
Mike Fährmann
2022-06-01 22:05:45 +02:00
parent 05d4a0215a
commit 2fb01938f4

View File

@@ -11,7 +11,7 @@
from .common import Extractor, Message
from .. import text, util, exception
from ..cache import cache
from ..cache import cache, memcache
import json
import time
import re
@@ -134,6 +134,21 @@ class InstagramExtractor(Extractor):
url, params=params, headers=headers, cookies=cookies,
).json()["data"]
@memcache(keyarg=1)
def _user_by_screen_name(self, screen_name):
url = "https://www.instagram.com/{}/?__a=1&__d=dis".format(
screen_name)
headers = {
"Referer": "https://www.instagram.com/{}/".format(screen_name),
"X-IG-App-ID" : "936619743392459",
"X-Requested-With": "XMLHttpRequest",
}
return self.request(url, headers=headers).json()["graphql"]["user"]
def _media_by_id(self, post_id):
endpoint = "/v1/media/{}/info/".format(post_id)
return self._pagination_api(endpoint)
def login(self):
if not self._check_cookies(self.cookienames):
username, password = self._get_auth_info()
@@ -186,19 +201,15 @@ class InstagramExtractor(Extractor):
def _parse_post_graphql(self, post):
typename = post["__typename"]
if post.get("is_video") and "video_url" not in post:
url = "{}/tv/{}/".format(self.root, post["shortcode"])
post = self._extract_post_page(url)
if "items" in post:
return self._parse_post_api({"media": post["items"][0]})
post = post["graphql"]["shortcode_media"]
elif typename == "GraphSidecar" and \
media = next(self._media_by_id(post["id"]))
return self._parse_post_api(media)
if typename == "GraphSidecar" and \
"edge_sidecar_to_children" not in post:
url = "{}/p/{}/".format(self.root, post["shortcode"])
post = self._extract_post_page(url)
if "items" in post:
return self._parse_post_api({"media": post["items"][0]})
post = post["graphql"]["shortcode_media"]
media = next(self._media_by_id(post["id"]))
return self._parse_post_api(media)
owner = post["owner"]
data = {
@@ -238,7 +249,7 @@ class InstagramExtractor(Extractor):
"num": num,
"media_id" : node["id"],
"shortcode" : (node.get("shortcode") or
self._shortcode_from_id(node["id"])),
shortcode_from_id(node["id"])),
"display_url": node["display_url"],
"video_url" : node.get("video_url"),
"width" : dimensions["width"],
@@ -270,7 +281,7 @@ class InstagramExtractor(Extractor):
owner = media["user"]
data = {
"post_id" : media["pk"],
"post_shortcode": self._shortcode_from_id(media["pk"]),
"post_shortcode": shortcode_from_id(media["pk"]),
}
if "carousel_media" in media:
@@ -286,7 +297,7 @@ class InstagramExtractor(Extractor):
data = {
"expires" : text.parse_timestamp(post.get("expiring_at")),
"post_id" : reel_id,
"post_shortcode": self._shortcode_from_id(reel_id),
"post_shortcode": shortcode_from_id(reel_id),
}
data["owner_id"] = owner["pk"]
@@ -314,7 +325,7 @@ class InstagramExtractor(Extractor):
media.get("taken_at")),
"media_id" : item["pk"],
"shortcode" : (item.get("code") or
self._shortcode_from_id(item["pk"])),
shortcode_from_id(item["pk"])),
"display_url": image["url"],
"video_url" : video["url"] if video else None,
"width" : media["width"],
@@ -325,14 +336,6 @@ class InstagramExtractor(Extractor):
return data
@staticmethod
def _shortcode_from_id(post_id):
return util.bencode(
int(post_id),
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789-_")
@staticmethod
def _extract_tagged_users(src, dest):
dest["tagged_users"] = tagged_users = []
@@ -383,13 +386,6 @@ class InstagramExtractor(Extractor):
json.loads(additional_data.partition(",")[2])
return data
def _extract_profile_page(self, url):
page = self.request(url).text
data = self._extract_shared_data(page)["entry_data"]
if "HttpErrorPage" in data:
raise exception.NotFoundError("user")
return data["ProfilePage"][0]["graphql"]["user"]
def _extract_post_page(self, url):
page = self.request(url).text
data = self._extract_shared_data(page)["entry_data"]
@@ -410,25 +406,40 @@ class InstagramExtractor(Extractor):
}
return user[key]
def _pagination_graphql(self, query_hash, variables, data):
def _pagination_graphql(self, query_hash, variables):
cursor = self.config("cursor")
if cursor:
variables["after"] = cursor
while True:
data = next(iter(self._request_graphql(
query_hash, variables)["user"].values()))
for edge in data["edges"]:
yield edge["node"]
info = data["page_info"]
if not info["has_next_page"]:
return
elif not data["edges"] and "_virtual" not in info:
elif not data["edges"]:
s = "" if self.item.endswith("s") else "s"
raise exception.StopExtraction(
"%s'%s posts are private", self.item, s)
variables["after"] = self._cursor = info["end_cursor"]
self.log.debug("Cursor: %s", self._cursor)
data = next(iter(self._request_graphql(
query_hash, variables)["user"].values()))
def _pagination_api(self, endpoint, params):
def _pagination_api(self, endpoint, params=None):
while True:
data = self._request_api(endpoint, params=params)
for item in data["items"]:
yield {"media": item}
if not data["more_available"]:
return
params["max_id"] = data["next_max_id"]
def _pagination_api_post(self, endpoint, params, post=False):
while True:
data = self._request_api(endpoint, method="POST", data=params)
yield from data["items"]
@@ -471,13 +482,11 @@ class InstagramPostsExtractor(InstagramExtractor):
})
def posts(self):
url = "{}/{}/".format(self.root, self.item)
user = self._extract_profile_page(url)
user = self._user_by_screen_name(self.item)
query_hash = "8c2a529969ee035a5063f2fc8602a0fd"
query_hash = "69cba40317214236af40e7efa697781d"
variables = {"id": user["id"], "first": 50}
edge = self._get_edge_data(user, "edge_owner_to_timeline_media")
return self._pagination_graphql(query_hash, variables, edge)
return self._pagination_graphql(query_hash, variables)
class InstagramTaggedExtractor(InstagramExtractor):
@@ -495,8 +504,7 @@ class InstagramTaggedExtractor(InstagramExtractor):
})
def metadata(self):
url = "{}/{}/".format(self.root, self.item)
self.user = user = self._extract_profile_page(url)
self.user = user = self._user_by_screen_name(self.item)
return {
"tagged_owner_id" : user["id"],
@@ -505,10 +513,9 @@ class InstagramTaggedExtractor(InstagramExtractor):
}
def posts(self):
query_hash = "be13233562af2d229b008d2976b998b5"
variables = {"id": self.user["id"], "first": 50}
edge = self._get_edge_data(self.user, None)
return self._pagination_graphql(query_hash, variables, edge)
endpoint = "/v1/usertags/{}/feed/".format(self.user["id"])
params = {"count": 50}
return self._pagination_api(endpoint, params)
class InstagramChannelExtractor(InstagramExtractor):
@@ -521,13 +528,11 @@ class InstagramChannelExtractor(InstagramExtractor):
})
def posts(self):
url = "{}/{}/channel/".format(self.root, self.item)
user = self._extract_profile_page(url)
user = self._user_by_screen_name(self.item)
query_hash = "bc78b344a68ed16dd5d7f264681c4c76"
variables = {"id": user["id"], "first": 50}
edge = self._get_edge_data(user, "edge_felix_video_timeline")
return self._pagination_graphql(query_hash, variables, edge)
return self._pagination_graphql(query_hash, variables)
class InstagramSavedExtractor(InstagramExtractor):
@@ -537,13 +542,11 @@ class InstagramSavedExtractor(InstagramExtractor):
test = ("https://www.instagram.com/instagram/saved/",)
def posts(self):
url = "{}/{}/saved/".format(self.root, self.item)
user = self._extract_profile_page(url)
user = self._user_by_screen_name(self.item)
query_hash = "2ce1d673055b99250e93b6f88f878fde"
variables = {"id": user["id"], "first": 50}
edge = self._get_edge_data(user, "edge_saved_media")
return self._pagination_graphql(query_hash, variables, edge)
return self._pagination_graphql(query_hash, variables)
class InstagramTagExtractor(InstagramExtractor):
@@ -719,19 +722,7 @@ class InstagramPostExtractor(InstagramExtractor):
)
def posts(self):
query_hash = "2efa04f61586458cef44441f474eee7c"
variables = {
"shortcode" : self.item,
"child_comment_count" : 3,
"fetch_comment_count" : 40,
"parent_comment_count" : 24,
"has_threaded_comments": True,
}
data = self._request_graphql(query_hash, variables)
media = data.get("shortcode_media")
if not media:
raise exception.NotFoundError("post")
return (media,)
return self._media_by_id(id_from_shortcode(self.item))
class InstagramStoriesExtractor(InstagramExtractor):
@@ -790,8 +781,7 @@ class InstagramHighlightsExtractor(InstagramExtractor):
test = ("https://www.instagram.com/instagram/highlights",)
def posts(self):
url = "{}/{}/".format(self.root, self.item)
user = self._extract_profile_page(url)
user = self._user_by_screen_name(self.item)
endpoint = "/v1/highlights/{}/highlights_tray/".format(user["id"])
tray = self._request_api(endpoint)["tray"]
@@ -820,13 +810,23 @@ class InstagramReelsExtractor(InstagramExtractor):
})
def posts(self):
url = "{}/{}/".format(self.root, self.item)
user = self._extract_profile_page(url)
endpoint = "/v1/clips/user/"
data = {
"target_user_id": user["id"],
"target_user_id": self._user_by_screen_name(self.item)["id"],
"page_size" : "50",
}
return self._pagination_api(endpoint, data)
return self._pagination_api_post(endpoint, data)
def id_from_shortcode(shortcode):
return util.bdecode(shortcode, _ALPHABET)
def shortcode_from_id(post_id):
return util.bencode(int(post_id), _ALPHABET)
_ALPHABET = ("ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789-_")