[archive] implement support for PostgreSQL databases (#6152)
This commit is contained in:
@@ -922,11 +922,13 @@ Description
|
|||||||
extractor.*.archive
|
extractor.*.archive
|
||||||
-------------------
|
-------------------
|
||||||
Type
|
Type
|
||||||
|Path|_
|
* ``string``
|
||||||
|
* |Path|_
|
||||||
Default
|
Default
|
||||||
``null``
|
``null``
|
||||||
Example
|
Example
|
||||||
``"$HOME/.archives/{category}.sqlite3"``
|
* ``"$HOME/.archives/{category}.sqlite3"``
|
||||||
|
* ``"postgresql://user:pass@host/database"``
|
||||||
Description
|
Description
|
||||||
File to store IDs of downloaded files in. Downloads of files
|
File to store IDs of downloaded files in. Downloads of files
|
||||||
already recorded in this archive file will be
|
already recorded in this archive file will be
|
||||||
@@ -937,6 +939,11 @@ Description
|
|||||||
memory requirements are significantly lower when the
|
memory requirements are significantly lower when the
|
||||||
amount of stored IDs gets reasonably large.
|
amount of stored IDs gets reasonably large.
|
||||||
|
|
||||||
|
If this value is a
|
||||||
|
`PostgreSQL Connection URI <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS>`__,
|
||||||
|
the archive will use this PostgreSQL database as backend (requires
|
||||||
|
`Psycopg <https://www.psycopg.org/>`__).
|
||||||
|
|
||||||
Note: Archive files that do not already exist get generated automatically.
|
Note: Archive files that do not already exist get generated automatically.
|
||||||
|
|
||||||
Note: Archive paths support regular `format string`_ replacements,
|
Note: Archive paths support regular `format string`_ replacements,
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
# Copyright 2024 Mike Fährmann
|
# Copyright 2024-2025 Mike Fährmann
|
||||||
#
|
#
|
||||||
# This program is free software; you can redistribute it and/or modify
|
# This program is free software; you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU General Public License version 2 as
|
# it under the terms of the GNU General Public License version 2 as
|
||||||
@@ -9,26 +9,55 @@
|
|||||||
"""Download Archives"""
|
"""Download Archives"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sqlite3
|
import logging
|
||||||
from . import formatter
|
from . import util, formatter
|
||||||
|
|
||||||
|
log = logging.getLogger("archive")
|
||||||
|
|
||||||
|
|
||||||
|
def connect(path, prefix, format, mode=None, pragma=None, kwdict=None):
|
||||||
|
keygen = formatter.parse(prefix + format).format_map
|
||||||
|
|
||||||
|
if path.startswith(("postgres://", "postgresql://")):
|
||||||
|
if mode == "memory":
|
||||||
|
cls = DownloadArchivePostgresqlMemory
|
||||||
|
else:
|
||||||
|
cls = DownloadArchivePostgresql
|
||||||
|
else:
|
||||||
|
path = util.expand_path(path)
|
||||||
|
if kwdict is not None and "{" in path:
|
||||||
|
path = formatter.parse(path).format_map(kwdict)
|
||||||
|
if mode == "memory":
|
||||||
|
cls = DownloadArchiveMemory
|
||||||
|
else:
|
||||||
|
cls = DownloadArchive
|
||||||
|
|
||||||
|
return cls(path, keygen, pragma)
|
||||||
|
|
||||||
|
|
||||||
class DownloadArchive():
|
class DownloadArchive():
|
||||||
|
_sqlite3 = None
|
||||||
|
|
||||||
|
def __init__(self, path, keygen, pragma=None, cache_key=None):
|
||||||
|
|
||||||
|
if self._sqlite3 is None:
|
||||||
|
import sqlite3
|
||||||
|
DownloadArchive._sqlite3 = sqlite3
|
||||||
|
|
||||||
def __init__(self, path, format_string, pragma=None,
|
|
||||||
cache_key="_archive_key"):
|
|
||||||
try:
|
try:
|
||||||
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
|
con = self._sqlite3.connect(
|
||||||
|
path, timeout=60, check_same_thread=False)
|
||||||
except sqlite3.OperationalError:
|
except sqlite3.OperationalError:
|
||||||
os.makedirs(os.path.dirname(path))
|
os.makedirs(os.path.dirname(path))
|
||||||
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
|
con = self._sqlite3.connect(
|
||||||
|
path, timeout=60, check_same_thread=False)
|
||||||
con.isolation_level = None
|
con.isolation_level = None
|
||||||
|
|
||||||
self.keygen = formatter.parse(format_string).format_map
|
self.keygen = keygen
|
||||||
self.connection = con
|
self.connection = con
|
||||||
self.close = con.close
|
self.close = con.close
|
||||||
self.cursor = cursor = con.cursor()
|
self.cursor = cursor = con.cursor()
|
||||||
self._cache_key = cache_key
|
self._cache_key = cache_key or "_archive_key"
|
||||||
|
|
||||||
if pragma:
|
if pragma:
|
||||||
for stmt in pragma:
|
for stmt in pragma:
|
||||||
@@ -37,7 +66,7 @@ class DownloadArchive():
|
|||||||
try:
|
try:
|
||||||
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
|
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
|
||||||
"(entry TEXT PRIMARY KEY) WITHOUT ROWID")
|
"(entry TEXT PRIMARY KEY) WITHOUT ROWID")
|
||||||
except sqlite3.OperationalError:
|
except self._sqlite3.OperationalError:
|
||||||
# fallback for missing WITHOUT ROWID support (#553)
|
# fallback for missing WITHOUT ROWID support (#553)
|
||||||
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
|
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
|
||||||
"(entry TEXT PRIMARY KEY)")
|
"(entry TEXT PRIMARY KEY)")
|
||||||
@@ -61,9 +90,9 @@ class DownloadArchive():
|
|||||||
|
|
||||||
class DownloadArchiveMemory(DownloadArchive):
|
class DownloadArchiveMemory(DownloadArchive):
|
||||||
|
|
||||||
def __init__(self, path, format_string, pragma=None,
|
def __init__(self, path, keygen, pragma=None, cache_key=None):
|
||||||
cache_key="_archive_key"):
|
DownloadArchive.__init__(
|
||||||
DownloadArchive.__init__(self, path, format_string, pragma, cache_key)
|
self, path, keygen, pragma, cache_key)
|
||||||
self.keys = set()
|
self.keys = set()
|
||||||
|
|
||||||
def add(self, kwdict):
|
def add(self, kwdict):
|
||||||
@@ -87,7 +116,7 @@ class DownloadArchiveMemory(DownloadArchive):
|
|||||||
with self.connection:
|
with self.connection:
|
||||||
try:
|
try:
|
||||||
cursor.execute("BEGIN")
|
cursor.execute("BEGIN")
|
||||||
except sqlite3.OperationalError:
|
except self._sqlite3.OperationalError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)"
|
stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)"
|
||||||
@@ -96,3 +125,107 @@ class DownloadArchiveMemory(DownloadArchive):
|
|||||||
cursor.execute(stmt, (key,))
|
cursor.execute(stmt, (key,))
|
||||||
else:
|
else:
|
||||||
cursor.executemany(stmt, ((key,) for key in self.keys))
|
cursor.executemany(stmt, ((key,) for key in self.keys))
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadArchivePostgresql():
|
||||||
|
_psycopg = None
|
||||||
|
|
||||||
|
def __init__(self, uri, keygen, pragma=None, cache_key=None):
|
||||||
|
if self._psycopg is None:
|
||||||
|
import psycopg
|
||||||
|
DownloadArchivePostgresql._psycopg = psycopg
|
||||||
|
|
||||||
|
self.connection = con = self._psycopg.connect(uri)
|
||||||
|
self.cursor = cursor = con.cursor()
|
||||||
|
self.close = con.close
|
||||||
|
self.keygen = keygen
|
||||||
|
self._cache_key = cache_key or "_archive_key"
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
|
||||||
|
"(entry TEXT PRIMARY KEY)")
|
||||||
|
con.commit()
|
||||||
|
except Exception as exc:
|
||||||
|
log.error("%s: %s when creating 'archive' table: %s",
|
||||||
|
con, exc.__class__.__name__, exc)
|
||||||
|
con.rollback()
|
||||||
|
raise
|
||||||
|
|
||||||
|
def add(self, kwdict):
|
||||||
|
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
|
||||||
|
try:
|
||||||
|
self.cursor.execute(
|
||||||
|
"INSERT INTO archive (entry) "
|
||||||
|
"VALUES (%s) "
|
||||||
|
"ON CONFLICT DO NOTHING",
|
||||||
|
(key,))
|
||||||
|
self.connection.commit()
|
||||||
|
except Exception as exc:
|
||||||
|
log.error("%s: %s when writing entry: %s",
|
||||||
|
self.connection, exc.__class__.__name__, exc)
|
||||||
|
self.connection.rollback()
|
||||||
|
|
||||||
|
def check(self, kwdict):
|
||||||
|
key = kwdict[self._cache_key] = self.keygen(kwdict)
|
||||||
|
try:
|
||||||
|
self.cursor.execute(
|
||||||
|
"SELECT true "
|
||||||
|
"FROM archive "
|
||||||
|
"WHERE entry=%s "
|
||||||
|
"LIMIT 1",
|
||||||
|
(key,))
|
||||||
|
return self.cursor.fetchone()
|
||||||
|
except Exception as exc:
|
||||||
|
log.error("%s: %s when checking entry: %s",
|
||||||
|
self.connection, exc.__class__.__name__, exc)
|
||||||
|
self.connection.rollback()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def finalize(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
|
||||||
|
|
||||||
|
def __init__(self, path, keygen, pragma=None, cache_key=None):
|
||||||
|
DownloadArchivePostgresql.__init__(
|
||||||
|
self, path, keygen, pragma, cache_key)
|
||||||
|
self.keys = set()
|
||||||
|
|
||||||
|
def add(self, kwdict):
|
||||||
|
self.keys.add(
|
||||||
|
kwdict.get(self._cache_key) or
|
||||||
|
self.keygen(kwdict))
|
||||||
|
|
||||||
|
def check(self, kwdict):
|
||||||
|
key = kwdict[self._cache_key] = self.keygen(kwdict)
|
||||||
|
if key in self.keys:
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
self.cursor.execute(
|
||||||
|
"SELECT true "
|
||||||
|
"FROM archive "
|
||||||
|
"WHERE entry=%s "
|
||||||
|
"LIMIT 1",
|
||||||
|
(key,))
|
||||||
|
return self.cursor.fetchone()
|
||||||
|
except Exception as exc:
|
||||||
|
log.error("%s: %s when checking entry: %s",
|
||||||
|
self.connection, exc.__class__.__name__, exc)
|
||||||
|
self.connection.rollback()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def finalize(self):
|
||||||
|
if not self.keys:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.cursor.executemany(
|
||||||
|
"INSERT INTO archive (entry) "
|
||||||
|
"VALUES (%s) "
|
||||||
|
"ON CONFLICT DO NOTHING",
|
||||||
|
((key,) for key in self.keys))
|
||||||
|
self.connection.commit()
|
||||||
|
except Exception as exc:
|
||||||
|
log.error("%s: %s when writing entries: %s",
|
||||||
|
self.connection, exc.__class__.__name__, exc)
|
||||||
|
self.connection.rollback()
|
||||||
|
|||||||
@@ -551,8 +551,6 @@ class DownloadJob(Job):
|
|||||||
|
|
||||||
archive_path = cfg("archive")
|
archive_path = cfg("archive")
|
||||||
if archive_path:
|
if archive_path:
|
||||||
archive_path = util.expand_path(archive_path)
|
|
||||||
|
|
||||||
archive_prefix = cfg("archive-prefix")
|
archive_prefix = cfg("archive-prefix")
|
||||||
if archive_prefix is None:
|
if archive_prefix is None:
|
||||||
archive_prefix = extr.category
|
archive_prefix = extr.category
|
||||||
@@ -562,16 +560,11 @@ class DownloadJob(Job):
|
|||||||
archive_format = extr.archive_fmt
|
archive_format = extr.archive_fmt
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if "{" in archive_path:
|
self.archive = archive.connect(
|
||||||
archive_path = formatter.parse(
|
|
||||||
archive_path).format_map(kwdict)
|
|
||||||
if cfg("archive-mode") == "memory":
|
|
||||||
archive_cls = archive.DownloadArchiveMemory
|
|
||||||
else:
|
|
||||||
archive_cls = archive.DownloadArchive
|
|
||||||
self.archive = archive_cls(
|
|
||||||
archive_path,
|
archive_path,
|
||||||
archive_prefix + archive_format,
|
archive_prefix,
|
||||||
|
archive_format,
|
||||||
|
cfg("archive-mode"),
|
||||||
cfg("archive-pragma"),
|
cfg("archive-pragma"),
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
|||||||
Reference in New Issue
Block a user