[cookies] cleanup (#1606)

- fix Popen.communicate() calls
- move and simplify _process_chrome_cookie() code
- remove random print() statement and config dict
This commit is contained in:
Mike Fährmann
2022-05-08 01:26:47 +02:00
parent 6742f3bc1e
commit aa2db7abeb

View File

@@ -6,7 +6,7 @@
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
# Adapted freom yt-dlp's cookies module.
# Adapted from yt-dlp's cookies module.
# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/cookies.py
import binascii
@@ -82,32 +82,42 @@ def load_cookies_chrome(cookiejar, browser_name, profile, keyring):
with _chrome_cookies_database(profile, config) as db:
db.text_factory = bytes
decryptor = get_cookie_decryptor(
config["directory"], config["keyring"], keyring=keyring)
failed_cookies = 0
unencrypted_cookies = 0
db.text_factory = bytes
set_cookie = cookiejar.set_cookie
try:
rows = db.execute(
"SELECT host_key, name, value, encrypted_value, path, "
"expires_utc, is_secure FROM cookies")
except sqlite3.OperationalError:
print(1)
rows = db.execute(
"SELECT host_key, name, value, encrypted_value, path, "
"expires_utc, secure FROM cookies")
for row in rows:
is_encrypted, cookie = _process_chrome_cookie(decryptor, *row)
if not cookie:
failed_cookies += 1
continue
elif not is_encrypted:
set_cookie = cookiejar.set_cookie
failed_cookies = unencrypted_cookies = 0
for domain, name, value, enc_value, path, expires, secure in rows:
if not value and enc_value: # encrypted
value = decryptor.decrypt(enc_value)
if value is None:
failed_cookies += 1
continue
else:
value = value.decode()
unencrypted_cookies += 1
set_cookie(cookie)
domain = domain.decode()
path = path.decode()
name = name.decode()
set_cookie(Cookie(
0, name, value, None, False,
domain, bool(domain), domain.startswith("."),
path, bool(path), secure, expires, False, None, None, {},
))
if failed_cookies > 0:
failed_message = " ({} could not be decrypted)".format(failed_cookies)
@@ -329,32 +339,6 @@ def _get_chromium_based_browser_settings(browser_name):
"profiles" : browser_name not in browsers_without_profiles
}
return {
"browser_dir": browser_dir,
"keyring_name": keyring_name,
"supports_profiles": browser_name not in browsers_without_profiles
}
def _process_chrome_cookie(decryptor, host_key, name, value, encrypted_value,
path, expires_utc, is_secure):
host_key = host_key.decode()
name = name.decode()
value = value.decode()
path = path.decode()
is_encrypted = not value and encrypted_value
if is_encrypted:
value = decryptor.decrypt(encrypted_value)
if value is None:
return is_encrypted, None
return is_encrypted, Cookie(
0, name, value, None, False,
host_key, bool(host_key), host_key.startswith("."),
path, bool(path), is_secure, expires_utc, False, None, None, {},
)
class ChromeCookieDecryptor:
"""
@@ -559,14 +543,13 @@ def _get_kwallet_network_wallet():
"""
default_wallet = "kdewallet"
try:
proc = Popen(
proc, stdout = Popen_communicate(
"dbus-send", "--session", "--print-reply=literal",
"--dest=org.kde.kwalletd5",
"/modules/kwalletd5",
"org.kde.KWallet.networkWallet"
)
stdout, stderr = proc.communicate_or_kill()
if proc.returncode != 0:
logger.warning("failed to read NetworkWallet")
return default_wallet
@@ -593,14 +576,13 @@ def _get_kwallet_password(browser_keyring_name):
network_wallet = _get_kwallet_network_wallet()
try:
proc = Popen(
proc, stdout = Popen_communicate(
"kwallet-query",
"--read-password", browser_keyring_name + " Safe Storage",
"--folder", browser_keyring_name + " Keys",
network_wallet,
)
stdout, stderr = proc.communicate_or_kill()
if proc.returncode != 0:
logger.error("kwallet-query failed with return code {}. "
"Please consult the kwallet-query man page "
@@ -680,14 +662,13 @@ def _get_mac_keyring_password(browser_keyring_name):
logger.debug("using find-generic-password to obtain "
"password from OSX keychain")
try:
proc = Popen(
proc, stdout = Popen_communicate(
"security", "find-generic-password",
"-w", # write password to stdout
"-a", browser_keyring_name, # match "account"
"-s", browser_keyring_name + " Safe Storage", # match "service"
)
stdout, stderr = proc.communicate_or_kill()
if stdout[-1:] == b"\n":
stdout = stdout[:-1]
return stdout
@@ -801,9 +782,16 @@ class DatabaseCopy():
self.directory.cleanup()
def Popen(*args):
return subprocess.Popen(
def Popen_communicate(*args):
proc = subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
try:
stdout, stderr = proc.communicate()
except BaseException: # Including KeyboardInterrupt
proc.kill()
proc.wait()
raise
return proc, stdout
"""
@@ -838,8 +826,8 @@ def _get_linux_desktop_environment(env):
desktop_session = env.get("DESKTOP_SESSION")
if xdg_current_desktop:
xdg_current_desktop = xdg_current_desktop.partition(
":")[0].strip().lower()
xdg_current_desktop = (xdg_current_desktop.partition(":")[0]
.strip().lower())
if xdg_current_desktop == "unity":
if desktop_session and "gnome-fallback" in desktop_session: