diff --git a/docs/configuration.rst b/docs/configuration.rst index f73ccb4f..7985f54d 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -922,11 +922,13 @@ Description extractor.*.archive ------------------- Type - |Path|_ + * ``string`` + * |Path|_ Default ``null`` Example - ``"$HOME/.archives/{category}.sqlite3"`` + * ``"$HOME/.archives/{category}.sqlite3"`` + * ``"postgresql://user:pass@host/database"`` Description File to store IDs of downloaded files in. Downloads of files already recorded in this archive file will be @@ -937,6 +939,11 @@ Description memory requirements are significantly lower when the amount of stored IDs gets reasonably large. + If this value is a + `PostgreSQL Connection URI `__, + the archive will use this PostgreSQL database as backend (requires + `Psycopg `__). + Note: Archive files that do not already exist get generated automatically. Note: Archive paths support regular `format string`_ replacements, diff --git a/gallery_dl/archive.py b/gallery_dl/archive.py index 5f05bbfd..bd35895d 100644 --- a/gallery_dl/archive.py +++ b/gallery_dl/archive.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2024 Mike Fährmann +# Copyright 2024-2025 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as @@ -9,26 +9,55 @@ """Download Archives""" import os -import sqlite3 -from . import formatter +import logging +from . import util, formatter + +log = logging.getLogger("archive") + + +def connect(path, prefix, format, mode=None, pragma=None, kwdict=None): + keygen = formatter.parse(prefix + format).format_map + + if path.startswith(("postgres://", "postgresql://")): + if mode == "memory": + cls = DownloadArchivePostgresqlMemory + else: + cls = DownloadArchivePostgresql + else: + path = util.expand_path(path) + if kwdict is not None and "{" in path: + path = formatter.parse(path).format_map(kwdict) + if mode == "memory": + cls = DownloadArchiveMemory + else: + cls = DownloadArchive + + return cls(path, keygen, pragma) class DownloadArchive(): + _sqlite3 = None + + def __init__(self, path, keygen, pragma=None, cache_key=None): + + if self._sqlite3 is None: + import sqlite3 + DownloadArchive._sqlite3 = sqlite3 - def __init__(self, path, format_string, pragma=None, - cache_key="_archive_key"): try: - con = sqlite3.connect(path, timeout=60, check_same_thread=False) + con = self._sqlite3.connect( + path, timeout=60, check_same_thread=False) except sqlite3.OperationalError: os.makedirs(os.path.dirname(path)) - con = sqlite3.connect(path, timeout=60, check_same_thread=False) + con = self._sqlite3.connect( + path, timeout=60, check_same_thread=False) con.isolation_level = None - self.keygen = formatter.parse(format_string).format_map + self.keygen = keygen self.connection = con self.close = con.close self.cursor = cursor = con.cursor() - self._cache_key = cache_key + self._cache_key = cache_key or "_archive_key" if pragma: for stmt in pragma: @@ -37,7 +66,7 @@ class DownloadArchive(): try: cursor.execute("CREATE TABLE IF NOT EXISTS archive " "(entry TEXT PRIMARY KEY) WITHOUT ROWID") - except sqlite3.OperationalError: + except self._sqlite3.OperationalError: # fallback for missing WITHOUT ROWID support (#553) cursor.execute("CREATE TABLE IF NOT EXISTS archive " "(entry TEXT PRIMARY KEY)") @@ -61,9 +90,9 @@ class DownloadArchive(): class DownloadArchiveMemory(DownloadArchive): - def __init__(self, path, format_string, pragma=None, - cache_key="_archive_key"): - DownloadArchive.__init__(self, path, format_string, pragma, cache_key) + def __init__(self, path, keygen, pragma=None, cache_key=None): + DownloadArchive.__init__( + self, path, keygen, pragma, cache_key) self.keys = set() def add(self, kwdict): @@ -87,7 +116,7 @@ class DownloadArchiveMemory(DownloadArchive): with self.connection: try: cursor.execute("BEGIN") - except sqlite3.OperationalError: + except self._sqlite3.OperationalError: pass stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)" @@ -96,3 +125,107 @@ class DownloadArchiveMemory(DownloadArchive): cursor.execute(stmt, (key,)) else: cursor.executemany(stmt, ((key,) for key in self.keys)) + + +class DownloadArchivePostgresql(): + _psycopg = None + + def __init__(self, uri, keygen, pragma=None, cache_key=None): + if self._psycopg is None: + import psycopg + DownloadArchivePostgresql._psycopg = psycopg + + self.connection = con = self._psycopg.connect(uri) + self.cursor = cursor = con.cursor() + self.close = con.close + self.keygen = keygen + self._cache_key = cache_key or "_archive_key" + + try: + cursor.execute("CREATE TABLE IF NOT EXISTS archive " + "(entry TEXT PRIMARY KEY)") + con.commit() + except Exception as exc: + log.error("%s: %s when creating 'archive' table: %s", + con, exc.__class__.__name__, exc) + con.rollback() + raise + + def add(self, kwdict): + key = kwdict.get(self._cache_key) or self.keygen(kwdict) + try: + self.cursor.execute( + "INSERT INTO archive (entry) " + "VALUES (%s) " + "ON CONFLICT DO NOTHING", + (key,)) + self.connection.commit() + except Exception as exc: + log.error("%s: %s when writing entry: %s", + self.connection, exc.__class__.__name__, exc) + self.connection.rollback() + + def check(self, kwdict): + key = kwdict[self._cache_key] = self.keygen(kwdict) + try: + self.cursor.execute( + "SELECT true " + "FROM archive " + "WHERE entry=%s " + "LIMIT 1", + (key,)) + return self.cursor.fetchone() + except Exception as exc: + log.error("%s: %s when checking entry: %s", + self.connection, exc.__class__.__name__, exc) + self.connection.rollback() + return False + + def finalize(self): + pass + + +class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql): + + def __init__(self, path, keygen, pragma=None, cache_key=None): + DownloadArchivePostgresql.__init__( + self, path, keygen, pragma, cache_key) + self.keys = set() + + def add(self, kwdict): + self.keys.add( + kwdict.get(self._cache_key) or + self.keygen(kwdict)) + + def check(self, kwdict): + key = kwdict[self._cache_key] = self.keygen(kwdict) + if key in self.keys: + return True + try: + self.cursor.execute( + "SELECT true " + "FROM archive " + "WHERE entry=%s " + "LIMIT 1", + (key,)) + return self.cursor.fetchone() + except Exception as exc: + log.error("%s: %s when checking entry: %s", + self.connection, exc.__class__.__name__, exc) + self.connection.rollback() + return False + + def finalize(self): + if not self.keys: + return + try: + self.cursor.executemany( + "INSERT INTO archive (entry) " + "VALUES (%s) " + "ON CONFLICT DO NOTHING", + ((key,) for key in self.keys)) + self.connection.commit() + except Exception as exc: + log.error("%s: %s when writing entries: %s", + self.connection, exc.__class__.__name__, exc) + self.connection.rollback() diff --git a/gallery_dl/job.py b/gallery_dl/job.py index 29149270..57b7c3ca 100644 --- a/gallery_dl/job.py +++ b/gallery_dl/job.py @@ -551,8 +551,6 @@ class DownloadJob(Job): archive_path = cfg("archive") if archive_path: - archive_path = util.expand_path(archive_path) - archive_prefix = cfg("archive-prefix") if archive_prefix is None: archive_prefix = extr.category @@ -562,16 +560,11 @@ class DownloadJob(Job): archive_format = extr.archive_fmt try: - if "{" in archive_path: - archive_path = formatter.parse( - archive_path).format_map(kwdict) - if cfg("archive-mode") == "memory": - archive_cls = archive.DownloadArchiveMemory - else: - archive_cls = archive.DownloadArchive - self.archive = archive_cls( + self.archive = archive.connect( archive_path, - archive_prefix + archive_format, + archive_prefix, + archive_format, + cfg("archive-mode"), cfg("archive-pragma"), ) except Exception as exc: