From ae1b24aa6a595b7a370fe1f6cd877c7526d49719 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20F=C3=A4hrmann?= Date: Fri, 10 Jun 2022 22:43:11 +0200 Subject: [PATCH] [instagram] automatically invalidate expired login sessions --- gallery_dl/extractor/instagram.py | 95 +++++++++++++++++-------------- 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/gallery_dl/extractor/instagram.py b/gallery_dl/extractor/instagram.py index 44326b69..8d76260a 100644 --- a/gallery_dl/extractor/instagram.py +++ b/gallery_dl/extractor/instagram.py @@ -82,8 +82,12 @@ class InstagramExtractor(Extractor): if response.history: - url = response.request.url + url = response.url if "/accounts/login/" in url: + if self._username: + self.log.debug("Invalidating cached login session for " + "'%s'", self._username) + _login_impl.invalidate(self._username) page = "login" elif "/challenge/" in url: page = "challenge" @@ -161,55 +165,15 @@ class InstagramExtractor(Extractor): return self._pagination_api(endpoint) def login(self): + self._username = None if not self._check_cookies(self.cookienames): username, password = self._get_auth_info() if username: - self._update_cookies(self._login_impl(username, password)) + self._username = username + self._update_cookies(_login_impl(self, username, password)) self.session.cookies.set( "csrftoken", self.csrf_token, domain=self.cookiedomain) - @cache(maxage=360*24*3600, keyarg=1) - def _login_impl(self, username, password): - self.log.info("Logging in as %s", username) - - url = self.root + "/accounts/login/" - page = self.request(url).text - - headers = { - "X-Web-Device-Id" : text.extract(page, '"device_id":"', '"')[0], - "X-IG-App-ID" : "936619743392459", - "X-ASBD-ID" : "437806", - "X-IG-WWW-Claim" : "0", - "X-Requested-With": "XMLHttpRequest", - "Referer" : url, - } - url = self.root + "/data/shared_data/" - data = self.request(url, headers=headers).json() - - headers["X-CSRFToken"] = data["config"]["csrf_token"] - headers["X-Instagram-AJAX"] = data["rollout_hash"] - headers["Origin"] = self.root - data = { - "username" : username, - "enc_password" : "#PWD_INSTAGRAM_BROWSER:0:{}:{}".format( - int(time.time()), password), - "queryParams" : "{}", - "optIntoOneTap" : "false", - "stopDeletionNonce" : "", - "trustedDeviceRecords": "{}", - } - url = self.root + "/accounts/login/ajax/" - response = self.request(url, method="POST", headers=headers, data=data) - - if not response.json().get("authenticated"): - raise exception.AuthenticationError() - - cget = self.session.cookies.get - return { - name: cget(name) - for name in ("sessionid", "mid", "ig_did") - } - def _parse_post_graphql(self, post): typename = post["__typename"] @@ -761,6 +725,49 @@ class InstagramReelsExtractor(InstagramExtractor): return self._pagination_api_post(endpoint, data) +@cache(maxage=360*24*3600, keyarg=1) +def _login_impl(extr, username, password): + extr.log.info("Logging in as %s", username) + + url = extr.root + "/accounts/login/" + page = extr.request(url).text + + headers = { + "X-Web-Device-Id" : text.extract(page, '"device_id":"', '"')[0], + "X-IG-App-ID" : "936619743392459", + "X-ASBD-ID" : "437806", + "X-IG-WWW-Claim" : "0", + "X-Requested-With": "XMLHttpRequest", + "Referer" : url, + } + url = extr.root + "/data/shared_data/" + data = extr.request(url, headers=headers).json() + + headers["X-CSRFToken"] = data["config"]["csrf_token"] + headers["X-Instagram-AJAX"] = data["rollout_hash"] + headers["Origin"] = extr.root + data = { + "username" : username, + "enc_password" : "#PWD_INSTAGRAM_BROWSER:0:{}:{}".format( + int(time.time()), password), + "queryParams" : "{}", + "optIntoOneTap" : "false", + "stopDeletionNonce" : "", + "trustedDeviceRecords": "{}", + } + url = extr.root + "/accounts/login/ajax/" + response = extr.request(url, method="POST", headers=headers, data=data) + + if not response.json().get("authenticated"): + raise exception.AuthenticationError() + + cget = extr.session.cookies.get + return { + name: cget(name) + for name in ("sessionid", "mid", "ig_did") + } + + def id_from_shortcode(shortcode): return util.bdecode(shortcode, _ALPHABET)