[archive] implement support for PostgreSQL databases (#6152)

This commit is contained in:
Mike Fährmann
2025-02-16 17:33:25 +01:00
parent b4eae65965
commit 841bc9f66f
3 changed files with 160 additions and 27 deletions

View File

@@ -922,11 +922,13 @@ Description
extractor.*.archive
-------------------
Type
|Path|_
* ``string``
* |Path|_
Default
``null``
Example
``"$HOME/.archives/{category}.sqlite3"``
* ``"$HOME/.archives/{category}.sqlite3"``
* ``"postgresql://user:pass@host/database"``
Description
File to store IDs of downloaded files in. Downloads of files
already recorded in this archive file will be
@@ -937,6 +939,11 @@ Description
memory requirements are significantly lower when the
amount of stored IDs gets reasonably large.
If this value is a
`PostgreSQL Connection URI <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS>`__,
the archive will use this PostgreSQL database as backend (requires
`Psycopg <https://www.psycopg.org/>`__).
Note: Archive files that do not already exist get generated automatically.
Note: Archive paths support regular `format string`_ replacements,

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2024 Mike Fährmann
# Copyright 2024-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -9,26 +9,55 @@
"""Download Archives"""
import os
import sqlite3
from . import formatter
import logging
from . import util, formatter
log = logging.getLogger("archive")
def connect(path, prefix, format, mode=None, pragma=None, kwdict=None):
keygen = formatter.parse(prefix + format).format_map
if path.startswith(("postgres://", "postgresql://")):
if mode == "memory":
cls = DownloadArchivePostgresqlMemory
else:
cls = DownloadArchivePostgresql
else:
path = util.expand_path(path)
if kwdict is not None and "{" in path:
path = formatter.parse(path).format_map(kwdict)
if mode == "memory":
cls = DownloadArchiveMemory
else:
cls = DownloadArchive
return cls(path, keygen, pragma)
class DownloadArchive():
_sqlite3 = None
def __init__(self, path, keygen, pragma=None, cache_key=None):
if self._sqlite3 is None:
import sqlite3
DownloadArchive._sqlite3 = sqlite3
def __init__(self, path, format_string, pragma=None,
cache_key="_archive_key"):
try:
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
except sqlite3.OperationalError:
os.makedirs(os.path.dirname(path))
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
con.isolation_level = None
self.keygen = formatter.parse(format_string).format_map
self.keygen = keygen
self.connection = con
self.close = con.close
self.cursor = cursor = con.cursor()
self._cache_key = cache_key
self._cache_key = cache_key or "_archive_key"
if pragma:
for stmt in pragma:
@@ -37,7 +66,7 @@ class DownloadArchive():
try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY) WITHOUT ROWID")
except sqlite3.OperationalError:
except self._sqlite3.OperationalError:
# fallback for missing WITHOUT ROWID support (#553)
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY)")
@@ -61,9 +90,9 @@ class DownloadArchive():
class DownloadArchiveMemory(DownloadArchive):
def __init__(self, path, format_string, pragma=None,
cache_key="_archive_key"):
DownloadArchive.__init__(self, path, format_string, pragma, cache_key)
def __init__(self, path, keygen, pragma=None, cache_key=None):
DownloadArchive.__init__(
self, path, keygen, pragma, cache_key)
self.keys = set()
def add(self, kwdict):
@@ -87,7 +116,7 @@ class DownloadArchiveMemory(DownloadArchive):
with self.connection:
try:
cursor.execute("BEGIN")
except sqlite3.OperationalError:
except self._sqlite3.OperationalError:
pass
stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)"
@@ -96,3 +125,107 @@ class DownloadArchiveMemory(DownloadArchive):
cursor.execute(stmt, (key,))
else:
cursor.executemany(stmt, ((key,) for key in self.keys))
class DownloadArchivePostgresql():
_psycopg = None
def __init__(self, uri, keygen, pragma=None, cache_key=None):
if self._psycopg is None:
import psycopg
DownloadArchivePostgresql._psycopg = psycopg
self.connection = con = self._psycopg.connect(uri)
self.cursor = cursor = con.cursor()
self.close = con.close
self.keygen = keygen
self._cache_key = cache_key or "_archive_key"
try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY)")
con.commit()
except Exception as exc:
log.error("%s: %s when creating 'archive' table: %s",
con, exc.__class__.__name__, exc)
con.rollback()
raise
def add(self, kwdict):
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
try:
self.cursor.execute(
"INSERT INTO archive (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
(key,))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
try:
self.cursor.execute(
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False
def finalize(self):
pass
class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
def __init__(self, path, keygen, pragma=None, cache_key=None):
DownloadArchivePostgresql.__init__(
self, path, keygen, pragma, cache_key)
self.keys = set()
def add(self, kwdict):
self.keys.add(
kwdict.get(self._cache_key) or
self.keygen(kwdict))
def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
if key in self.keys:
return True
try:
self.cursor.execute(
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False
def finalize(self):
if not self.keys:
return
try:
self.cursor.executemany(
"INSERT INTO archive (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
((key,) for key in self.keys))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entries: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()

View File

@@ -551,8 +551,6 @@ class DownloadJob(Job):
archive_path = cfg("archive")
if archive_path:
archive_path = util.expand_path(archive_path)
archive_prefix = cfg("archive-prefix")
if archive_prefix is None:
archive_prefix = extr.category
@@ -562,16 +560,11 @@ class DownloadJob(Job):
archive_format = extr.archive_fmt
try:
if "{" in archive_path:
archive_path = formatter.parse(
archive_path).format_map(kwdict)
if cfg("archive-mode") == "memory":
archive_cls = archive.DownloadArchiveMemory
else:
archive_cls = archive.DownloadArchive
self.archive = archive_cls(
self.archive = archive.connect(
archive_path,
archive_prefix + archive_format,
archive_prefix,
archive_format,
cfg("archive-mode"),
cfg("archive-pragma"),
)
except Exception as exc: