[archive] add 'archive-table' option (#6152)

This commit is contained in:
Mike Fährmann
2025-02-17 11:01:35 +01:00
parent dac0c4ac10
commit 8daf496a22
4 changed files with 69 additions and 42 deletions

View File

@@ -1001,7 +1001,8 @@ extractor.*.archive-prefix
Type Type
``string`` ``string``
Default Default
``"{category}"`` * ``""`` when `archive-table <extractor.*.archive-table_>`__ is set
* ``"{category}"`` otherwise
Description Description
Prefix for archive IDs. Prefix for archive IDs.
@@ -1019,6 +1020,18 @@ Description
for available ``PRAGMA`` statements and further details. for available ``PRAGMA`` statements and further details.
extractor.*.archive-table
-------------------------
Type
``string``
Default
``"archive"``
Example
``"{category}"``
Description
`Format string`_ selecting the archive database table name.
extractor.*.actions extractor.*.actions
------------------- -------------------
Type Type

View File

@@ -37,6 +37,7 @@
"archive-pragma": [], "archive-pragma": [],
"archive-event" : ["file"], "archive-event" : ["file"],
"archive-mode" : "file", "archive-mode" : "file",
"archive-table" : null,
"cookies": null, "cookies": null,
"cookies-select": null, "cookies-select": null,

View File

@@ -15,7 +15,8 @@ from . import util, formatter
log = logging.getLogger("archive") log = logging.getLogger("archive")
def connect(path, prefix, format, mode=None, pragma=None, kwdict=None): def connect(path, prefix, format,
table=None, mode=None, pragma=None, kwdict=None):
keygen = formatter.parse(prefix + format).format_map keygen = formatter.parse(prefix + format).format_map
if path.startswith(("postgres://", "postgresql://")): if path.startswith(("postgres://", "postgresql://")):
@@ -32,13 +33,20 @@ def connect(path, prefix, format, mode=None, pragma=None, kwdict=None):
else: else:
cls = DownloadArchive cls = DownloadArchive
return cls(path, keygen, pragma) if kwdict is not None and table:
table = formatter.parse(table).format_map(kwdict)
return cls(path, keygen, table, pragma)
def sanitize(name):
return '"' + name.replace('"', "_") + '"'
class DownloadArchive(): class DownloadArchive():
_sqlite3 = None _sqlite3 = None
def __init__(self, path, keygen, pragma=None, cache_key=None): def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
if self._sqlite3 is None: if self._sqlite3 is None:
import sqlite3 import sqlite3
@@ -59,29 +67,37 @@ class DownloadArchive():
self.cursor = cursor = con.cursor() self.cursor = cursor = con.cursor()
self._cache_key = cache_key or "_archive_key" self._cache_key = cache_key or "_archive_key"
table = "archive" if table is None else sanitize(table)
self._stmt_select = (
"SELECT 1 "
"FROM " + table + " "
"WHERE entry=? "
"LIMIT 1")
self._stmt_insert = (
"INSERT OR IGNORE INTO " + table + " "
"(entry) VALUES (?)")
if pragma: if pragma:
for stmt in pragma: for stmt in pragma:
cursor.execute("PRAGMA " + stmt) cursor.execute("PRAGMA " + stmt)
try: try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive " cursor.execute("CREATE TABLE IF NOT EXISTS " + table + " "
"(entry TEXT PRIMARY KEY) WITHOUT ROWID") "(entry TEXT PRIMARY KEY) WITHOUT ROWID")
except self._sqlite3.OperationalError: except self._sqlite3.OperationalError:
# fallback for missing WITHOUT ROWID support (#553) # fallback for missing WITHOUT ROWID support (#553)
cursor.execute("CREATE TABLE IF NOT EXISTS archive " cursor.execute("CREATE TABLE IF NOT EXISTS " + table + " "
"(entry TEXT PRIMARY KEY)") "(entry TEXT PRIMARY KEY)")
def add(self, kwdict): def add(self, kwdict):
"""Add item described by 'kwdict' to archive""" """Add item described by 'kwdict' to archive"""
key = kwdict.get(self._cache_key) or self.keygen(kwdict) key = kwdict.get(self._cache_key) or self.keygen(kwdict)
self.cursor.execute( self.cursor.execute(self._stmt_insert, (key,))
"INSERT OR IGNORE INTO archive (entry) VALUES (?)", (key,))
def check(self, kwdict): def check(self, kwdict):
"""Return True if the item described by 'kwdict' exists in archive""" """Return True if the item described by 'kwdict' exists in archive"""
key = kwdict[self._cache_key] = self.keygen(kwdict) key = kwdict[self._cache_key] = self.keygen(kwdict)
self.cursor.execute( self.cursor.execute(self._stmt_select, (key,))
"SELECT 1 FROM archive WHERE entry=? LIMIT 1", (key,))
return self.cursor.fetchone() return self.cursor.fetchone()
def finalize(self): def finalize(self):
@@ -90,9 +106,9 @@ class DownloadArchive():
class DownloadArchiveMemory(DownloadArchive): class DownloadArchiveMemory(DownloadArchive):
def __init__(self, path, keygen, pragma=None, cache_key=None): def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
DownloadArchive.__init__( DownloadArchive.__init__(
self, path, keygen, pragma, cache_key) self, path, keygen, table, pragma, cache_key)
self.keys = set() self.keys = set()
def add(self, kwdict): def add(self, kwdict):
@@ -104,8 +120,7 @@ class DownloadArchiveMemory(DownloadArchive):
key = kwdict[self._cache_key] = self.keygen(kwdict) key = kwdict[self._cache_key] = self.keygen(kwdict)
if key in self.keys: if key in self.keys:
return True return True
self.cursor.execute( self.cursor.execute(self._stmt_select, (key,))
"SELECT 1 FROM archive WHERE entry=? LIMIT 1", (key,))
return self.cursor.fetchone() return self.cursor.fetchone()
def finalize(self): def finalize(self):
@@ -119,7 +134,7 @@ class DownloadArchiveMemory(DownloadArchive):
except self._sqlite3.OperationalError: except self._sqlite3.OperationalError:
pass pass
stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)" stmt = self._stmt_insert
if len(self.keys) < 100: if len(self.keys) < 100:
for key in self.keys: for key in self.keys:
cursor.execute(stmt, (key,)) cursor.execute(stmt, (key,))
@@ -130,7 +145,7 @@ class DownloadArchiveMemory(DownloadArchive):
class DownloadArchivePostgresql(): class DownloadArchivePostgresql():
_psycopg = None _psycopg = None
def __init__(self, uri, keygen, pragma=None, cache_key=None): def __init__(self, uri, keygen, table=None, pragma=None, cache_key=None):
if self._psycopg is None: if self._psycopg is None:
import psycopg import psycopg
DownloadArchivePostgresql._psycopg = psycopg DownloadArchivePostgresql._psycopg = psycopg
@@ -141,24 +156,31 @@ class DownloadArchivePostgresql():
self.keygen = keygen self.keygen = keygen
self._cache_key = cache_key or "_archive_key" self._cache_key = cache_key or "_archive_key"
table = "archive" if table is None else sanitize(table)
self._stmt_select = (
"SELECT true "
"FROM " + table + " "
"WHERE entry=%s "
"LIMIT 1")
self._stmt_insert = (
"INSERT INTO " + table + " (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING")
try: try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive " cursor.execute("CREATE TABLE IF NOT EXISTS " + table + " "
"(entry TEXT PRIMARY KEY)") "(entry TEXT PRIMARY KEY)")
con.commit() con.commit()
except Exception as exc: except Exception as exc:
log.error("%s: %s when creating 'archive' table: %s", log.error("%s: %s when creating '%s' table: %s",
con, exc.__class__.__name__, exc) con, exc.__class__.__name__, table, exc)
con.rollback() con.rollback()
raise raise
def add(self, kwdict): def add(self, kwdict):
key = kwdict.get(self._cache_key) or self.keygen(kwdict) key = kwdict.get(self._cache_key) or self.keygen(kwdict)
try: try:
self.cursor.execute( self.cursor.execute(self._stmt_insert, (key,))
"INSERT INTO archive (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
(key,))
self.connection.commit() self.connection.commit()
except Exception as exc: except Exception as exc:
log.error("%s: %s when writing entry: %s", log.error("%s: %s when writing entry: %s",
@@ -168,12 +190,7 @@ class DownloadArchivePostgresql():
def check(self, kwdict): def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict) key = kwdict[self._cache_key] = self.keygen(kwdict)
try: try:
self.cursor.execute( self.cursor.execute(self._stmt_select, (key,))
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone() return self.cursor.fetchone()
except Exception as exc: except Exception as exc:
log.error("%s: %s when checking entry: %s", log.error("%s: %s when checking entry: %s",
@@ -187,9 +204,9 @@ class DownloadArchivePostgresql():
class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql): class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
def __init__(self, path, keygen, pragma=None, cache_key=None): def __init__(self, path, keygen, table=None, pragma=None, cache_key=None):
DownloadArchivePostgresql.__init__( DownloadArchivePostgresql.__init__(
self, path, keygen, pragma, cache_key) self, path, keygen, table, pragma, cache_key)
self.keys = set() self.keys = set()
def add(self, kwdict): def add(self, kwdict):
@@ -202,12 +219,7 @@ class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
if key in self.keys: if key in self.keys:
return True return True
try: try:
self.cursor.execute( self.cursor.execute(self._stmt_select, (key,))
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone() return self.cursor.fetchone()
except Exception as exc: except Exception as exc:
log.error("%s: %s when checking entry: %s", log.error("%s: %s when checking entry: %s",
@@ -220,9 +232,7 @@ class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):
return return
try: try:
self.cursor.executemany( self.cursor.executemany(
"INSERT INTO archive (entry) " self._stmt_insert,
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
((key,) for key in self.keys)) ((key,) for key in self.keys))
self.connection.commit() self.connection.commit()
except Exception as exc: except Exception as exc:

View File

@@ -551,9 +551,10 @@ class DownloadJob(Job):
archive_path = cfg("archive") archive_path = cfg("archive")
if archive_path: if archive_path:
archive_table = cfg("archive-table")
archive_prefix = cfg("archive-prefix") archive_prefix = cfg("archive-prefix")
if archive_prefix is None: if archive_prefix is None:
archive_prefix = extr.category archive_prefix = extr.category if archive_table is None else ""
archive_format = cfg("archive-format") archive_format = cfg("archive-format")
if archive_format is None: if archive_format is None:
@@ -564,8 +565,10 @@ class DownloadJob(Job):
archive_path, archive_path,
archive_prefix, archive_prefix,
archive_format, archive_format,
archive_table,
cfg("archive-mode"), cfg("archive-mode"),
cfg("archive-pragma"), cfg("archive-pragma"),
kwdict,
) )
except Exception as exc: except Exception as exc:
extr.log.warning( extr.log.warning(