[archive] implement support for PostgreSQL databases (#6152)

This commit is contained in:
Mike Fährmann
2025-02-16 17:33:25 +01:00
parent b4eae65965
commit 841bc9f66f
3 changed files with 160 additions and 27 deletions

View File

@@ -922,11 +922,13 @@ Description
extractor.*.archive extractor.*.archive
------------------- -------------------
Type Type
|Path|_ * ``string``
* |Path|_
Default Default
``null`` ``null``
Example Example
``"$HOME/.archives/{category}.sqlite3"`` * ``"$HOME/.archives/{category}.sqlite3"``
* ``"postgresql://user:pass@host/database"``
Description Description
File to store IDs of downloaded files in. Downloads of files File to store IDs of downloaded files in. Downloads of files
already recorded in this archive file will be already recorded in this archive file will be
@@ -937,6 +939,11 @@ Description
memory requirements are significantly lower when the memory requirements are significantly lower when the
amount of stored IDs gets reasonably large. amount of stored IDs gets reasonably large.
If this value is a
`PostgreSQL Connection URI <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS>`__,
the archive will use this PostgreSQL database as backend (requires
`Psycopg <https://www.psycopg.org/>`__).
Note: Archive files that do not already exist get generated automatically. Note: Archive files that do not already exist get generated automatically.
Note: Archive paths support regular `format string`_ replacements, Note: Archive paths support regular `format string`_ replacements,

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2024 Mike Fährmann # Copyright 2024-2025 Mike Fährmann
# #
# This program is free software; you can redistribute it and/or modify # This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as # it under the terms of the GNU General Public License version 2 as
@@ -9,26 +9,55 @@
"""Download Archives""" """Download Archives"""
import os import os
import sqlite3 import logging
from . import formatter from . import util, formatter
log = logging.getLogger("archive")
def connect(path, prefix, format, mode=None, pragma=None, kwdict=None):
keygen = formatter.parse(prefix + format).format_map
if path.startswith(("postgres://", "postgresql://")):
if mode == "memory":
cls = DownloadArchivePostgresqlMemory
else:
cls = DownloadArchivePostgresql
else:
path = util.expand_path(path)
if kwdict is not None and "{" in path:
path = formatter.parse(path).format_map(kwdict)
if mode == "memory":
cls = DownloadArchiveMemory
else:
cls = DownloadArchive
return cls(path, keygen, pragma)
class DownloadArchive(): class DownloadArchive():
_sqlite3 = None
def __init__(self, path, keygen, pragma=None, cache_key=None):
if self._sqlite3 is None:
import sqlite3
DownloadArchive._sqlite3 = sqlite3
def __init__(self, path, format_string, pragma=None,
cache_key="_archive_key"):
try: try:
con = sqlite3.connect(path, timeout=60, check_same_thread=False) con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
except sqlite3.OperationalError: except sqlite3.OperationalError:
os.makedirs(os.path.dirname(path)) os.makedirs(os.path.dirname(path))
con = sqlite3.connect(path, timeout=60, check_same_thread=False) con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
con.isolation_level = None con.isolation_level = None
self.keygen = formatter.parse(format_string).format_map self.keygen = keygen
self.connection = con self.connection = con
self.close = con.close self.close = con.close
self.cursor = cursor = con.cursor() self.cursor = cursor = con.cursor()
self._cache_key = cache_key self._cache_key = cache_key or "_archive_key"
if pragma: if pragma:
for stmt in pragma: for stmt in pragma:
@@ -37,7 +66,7 @@ class DownloadArchive():
try: try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive " cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY) WITHOUT ROWID") "(entry TEXT PRIMARY KEY) WITHOUT ROWID")
except sqlite3.OperationalError: except self._sqlite3.OperationalError:
# fallback for missing WITHOUT ROWID support (#553) # fallback for missing WITHOUT ROWID support (#553)
cursor.execute("CREATE TABLE IF NOT EXISTS archive " cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY)") "(entry TEXT PRIMARY KEY)")
@@ -61,9 +90,9 @@ class DownloadArchive():
class DownloadArchiveMemory(DownloadArchive): class DownloadArchiveMemory(DownloadArchive):
def __init__(self, path, format_string, pragma=None, def __init__(self, path, keygen, pragma=None, cache_key=None):
cache_key="_archive_key"): DownloadArchive.__init__(
DownloadArchive.__init__(self, path, format_string, pragma, cache_key) self, path, keygen, pragma, cache_key)
self.keys = set() self.keys = set()
def add(self, kwdict): def add(self, kwdict):
@@ -87,7 +116,7 @@ class DownloadArchiveMemory(DownloadArchive):
with self.connection: with self.connection:
try: try:
cursor.execute("BEGIN") cursor.execute("BEGIN")
except sqlite3.OperationalError: except self._sqlite3.OperationalError:
pass pass
stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)" stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)"
@@ -96,3 +125,107 @@ class DownloadArchiveMemory(DownloadArchive):
cursor.execute(stmt, (key,)) cursor.execute(stmt, (key,))
else: else:
cursor.executemany(stmt, ((key,) for key in self.keys)) cursor.executemany(stmt, ((key,) for key in self.keys))
class DownloadArchivePostgresql():
_psycopg = None
def __init__(self, uri, keygen, pragma=None, cache_key=None):
if self._psycopg is None:
import psycopg
DownloadArchivePostgresql._psycopg = psycopg
self.connection = con = self._psycopg.connect(uri)
self.cursor = cursor = con.cursor()
self.close = con.close
self.keygen = keygen
self._cache_key = cache_key or "_archive_key"
try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY)")
con.commit()
except Exception as exc:
log.error("%s: %s when creating 'archive' table: %s",
con, exc.__class__.__name__, exc)
con.rollback()
raise
def add(self, kwdict):
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
try:
self.cursor.execute(
"INSERT INTO archive (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
(key,))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
try:
self.cursor.execute(
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False
def finalize(self):
pass
class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
def __init__(self, path, keygen, pragma=None, cache_key=None):
DownloadArchivePostgresql.__init__(
self, path, keygen, pragma, cache_key)
self.keys = set()
def add(self, kwdict):
self.keys.add(
kwdict.get(self._cache_key) or
self.keygen(kwdict))
def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
if key in self.keys:
return True
try:
self.cursor.execute(
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False
def finalize(self):
if not self.keys:
return
try:
self.cursor.executemany(
"INSERT INTO archive (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
((key,) for key in self.keys))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entries: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()

View File

@@ -551,8 +551,6 @@ class DownloadJob(Job):
archive_path = cfg("archive") archive_path = cfg("archive")
if archive_path: if archive_path:
archive_path = util.expand_path(archive_path)
archive_prefix = cfg("archive-prefix") archive_prefix = cfg("archive-prefix")
if archive_prefix is None: if archive_prefix is None:
archive_prefix = extr.category archive_prefix = extr.category
@@ -562,16 +560,11 @@ class DownloadJob(Job):
archive_format = extr.archive_fmt archive_format = extr.archive_fmt
try: try:
if "{" in archive_path: self.archive = archive.connect(
archive_path = formatter.parse(
archive_path).format_map(kwdict)
if cfg("archive-mode") == "memory":
archive_cls = archive.DownloadArchiveMemory
else:
archive_cls = archive.DownloadArchive
self.archive = archive_cls(
archive_path, archive_path,
archive_prefix + archive_format, archive_prefix,
archive_format,
cfg("archive-mode"),
cfg("archive-pragma"), cfg("archive-pragma"),
) )
except Exception as exc: except Exception as exc: