[dt] replace 'util' datetime functions

This commit is contained in:
Mike Fährmann
2025-10-15 21:30:38 +02:00
parent d0f5d4e0d3
commit 0eb3c8a994
13 changed files with 62 additions and 128 deletions

View File

@@ -7,7 +7,7 @@
"""Extractors for https://4archive.org/"""
from .common import Extractor, Message
from .. import text, util
from .. import text, dt
class _4archiveThreadExtractor(Extractor):
@@ -37,7 +37,7 @@ class _4archiveThreadExtractor(Extractor):
for post in posts:
post.update(data)
post["time"] = int(util.datetime_to_timestamp(post["date"]))
post["time"] = int(dt.to_ts(post["date"]))
yield Message.Directory, post
if "url" in post:
yield Message.Url, post["url"], text.nameext_from_url(

View File

@@ -9,9 +9,8 @@
"""Extractors for https://8chan.moe/"""
from .common import Extractor, Message
from .. import text, util
from .. import text, dt
from ..cache import memcache
from datetime import timedelta
import itertools
BASE_PATTERN = r"(?:https?://)?8chan\.(moe|se|cc)"
@@ -44,7 +43,7 @@ class _8chanExtractor(Extractor):
def cookies_prepare(self):
# fetch captcha cookies
# (necessary to download without getting interrupted)
now = util.datetime_utcnow()
now = dt.now()
url = self.root + "/captcha.js"
params = {"d": now.strftime("%a %b %d %Y %H:%M:%S GMT+0000 (UTC)")}
self.request(url, params=params).content
@@ -57,7 +56,7 @@ class _8chanExtractor(Extractor):
if cookie.domain.endswith(domain):
cookie.expires = None
if cookie.name == "captchaexpiration":
cookie.value = (now + timedelta(30, 300)).strftime(
cookie.value = (now + dt.timedelta(30, 300)).strftime(
"%a, %d %b %Y %H:%M:%S GMT")
return self.cookies

View File

@@ -23,7 +23,7 @@ from datetime import datetime
from xml.etree import ElementTree
from requests.adapters import HTTPAdapter
from .message import Message
from .. import config, output, text, util, cache, exception
from .. import config, output, text, util, dt, cache, exception
urllib3 = requests.packages.urllib3
@@ -315,7 +315,7 @@ class Extractor():
elif until:
if isinstance(until, datetime):
# convert to UTC timestamp
until = util.datetime_to_timestamp(until)
until = dt.to_ts(until)
else:
until = float(until)
seconds = until - now

View File

@@ -9,7 +9,7 @@
"""Extractors for https://www.deviantart.com/"""
from .common import Extractor, Message, Dispatch
from .. import text, util, exception
from .. import text, util, dt, exception
from ..cache import cache, memcache
import collections
import mimetypes
@@ -1187,8 +1187,8 @@ class DeviantartStatusExtractor(DeviantartExtractor):
deviation["username"] = deviation["author"]["username"]
deviation["_username"] = deviation["username"].lower()
deviation["date"] = dt = text.parse_datetime(deviation["ts"])
deviation["published_time"] = int(util.datetime_to_timestamp(dt))
deviation["date"] = d = text.parse_datetime(deviation["ts"])
deviation["published_time"] = int(dt.to_ts(d))
deviation["da_category"] = "Status"
deviation["category_path"] = "status"

View File

@@ -9,9 +9,8 @@
"""Extractors for https://motherless.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from .. import text, dt, exception
from ..cache import memcache
from datetime import timedelta
BASE_PATTERN = r"(?:https?://)?motherless\.com"
@@ -115,14 +114,14 @@ class MotherlessExtractor(Extractor):
return data
def _parse_datetime(self, dt):
if " ago" not in dt:
return text.parse_datetime(dt, "%d %b %Y")
def _parse_datetime(self, dt_string):
if " ago" not in dt_string:
return dt.parse(dt_string, "%d %b %Y")
value = text.parse_int(dt[:-5])
delta = timedelta(0, value*3600) if dt[-5] == "h" else timedelta(value)
return (util.datetime_utcnow() - delta).replace(
hour=0, minute=0, second=0)
value = text.parse_int(dt_string[:-5])
delta = (dt.timedelta(0, value*3600) if dt_string[-5] == "h" else
dt.timedelta(value))
return (dt.now() - delta).replace(hour=0, minute=0, second=0)
@memcache(keyarg=2)
def _extract_gallery_title(self, page, gallery_id):

View File

@@ -9,7 +9,7 @@
"""Extractors for https://www.patreon.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from .. import text, util, dt, exception
from ..cache import memcache
import collections
import itertools
@@ -445,8 +445,7 @@ class PatreonUserExtractor(PatreonExtractor):
def posts(self):
if date_max := self._get_date_min_max(None, None)[1]:
self._cursor = cursor = \
util.datetime_from_timestamp(date_max).isoformat()
self._cursor = cursor = dt.from_ts(date_max).isoformat()
self._init_cursor = lambda: cursor
url = self._build_url("stream", (

View File

@@ -9,9 +9,8 @@
"""Extractors for https://www.pixiv.net/"""
from .common import Extractor, Message, Dispatch
from .. import text, util, exception
from .. import text, util, dt, exception
from ..cache import cache, memcache
from datetime import datetime, timedelta
import itertools
import hashlib
@@ -96,7 +95,7 @@ class PixivExtractor(Extractor):
if transform_tags:
transform_tags(work)
work["num"] = 0
work["date"] = text.parse_datetime(work["create_date"])
work["date"] = dt.parse_iso(work["create_date"])
work["rating"] = ratings.get(work["x_restrict"])
work["suffix"] = ""
work.update(metadata)
@@ -350,10 +349,10 @@ class PixivExtractor(Extractor):
if fmt in urls:
yield urls[fmt]
def _date_from_url(self, url, offset=timedelta(hours=9)):
def _date_from_url(self, url, offset=dt.timedelta(hours=9)):
try:
_, _, _, _, _, y, m, d, H, M, S, _ = url.split("/")
return datetime(
return dt.datetime(
int(y), int(m), int(d), int(H), int(M), int(S)) - offset
except Exception:
return None
@@ -712,8 +711,7 @@ class PixivRankingExtractor(PixivExtractor):
self.log.warning("invalid date '%s'", date)
date = None
if not date:
now = util.datetime_utcnow()
date = (now - timedelta(days=1)).strftime("%Y-%m-%d")
date = (dt.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
self.date = date
self.type = type = query.get("content")
@@ -888,8 +886,7 @@ class PixivSketchExtractor(Extractor):
for post in self.posts():
media = post["media"]
post["post_id"] = post["id"]
post["date"] = text.parse_datetime(
post["created_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
post["date"] = dt.parse_iso(post["created_at"])
util.delete_items(post, ("id", "media", "_links"))
yield Message.Directory, post
@@ -969,7 +966,7 @@ class PixivNovelExtractor(PixivExtractor):
if transform_tags:
transform_tags(novel)
novel["num"] = 0
novel["date"] = text.parse_datetime(novel["create_date"])
novel["date"] = dt.parse_iso(novel["create_date"])
novel["rating"] = ratings.get(novel["x_restrict"])
novel["suffix"] = ""
@@ -1151,7 +1148,7 @@ class PixivAppAPI():
"get_secure_url": "1",
}
time = util.datetime_utcnow().strftime("%Y-%m-%dT%H:%M:%S+00:00")
time = dt.now().strftime("%Y-%m-%dT%H:%M:%S+00:00")
headers = {
"X-Client-Time": time,
"X-Client-Hash": hashlib.md5(
@@ -1326,11 +1323,11 @@ class PixivAppAPI():
sort = params["sort"]
if sort == "date_desc":
date_key = "end_date"
date_off = timedelta(days=1)
date_off = dt.timedelta(days=1)
date_cmp = lambda lhs, rhs: lhs >= rhs # noqa E731
elif sort == "date_asc":
date_key = "start_date"
date_off = timedelta(days=-1)
date_off = dt.timedelta(days=-1)
date_cmp = lambda lhs, rhs: lhs <= rhs # noqa E731
else:
date_key = None
@@ -1357,8 +1354,8 @@ class PixivAppAPI():
if date_key and text.parse_int(params.get("offset")) >= 5000:
date_last = data["illusts"][-1]["create_date"]
date_val = (text.parse_datetime(
date_last) + date_off).strftime("%Y-%m-%d")
date_val = (dt.parse_iso(date_last) + date_off).strftime(
"%Y-%m-%d")
self.log.info("Reached 'offset' >= 5000; "
"Updating '%s' to '%s'", date_key, date_val)
params[date_key] = date_val

View File

@@ -15,7 +15,7 @@ import string
import _string
import datetime
import operator
from . import text, util
from . import text, util, dt
NONE = util.NONE
@@ -68,8 +68,8 @@ class StringFormatter():
- "g": calls text.slugify()
- "j": calls json.dumps
- "t": calls str.strip
- "T": calls util.datetime_to_timestamp_string()
- "d": calls text.parse_timestamp
- "T": calls dt.to_ts_string()
- "d": calls dt.parse_ts()
- "s": calls str()
- "S": calls util.to_string()
- "U": calls urllib.parse.unescape
@@ -471,9 +471,9 @@ def _parse_datetime(format_spec, default):
dt_format = dt_format[1:]
fmt = _build_format_func(format_spec, default)
def dt(obj):
return fmt(text.parse_datetime(obj, dt_format))
return dt
def dt_parse(obj):
return fmt(dt.parse(obj, dt_format))
return dt_parse
def _parse_offset(format_spec, default):
@@ -482,9 +482,9 @@ def _parse_offset(format_spec, default):
fmt = _build_format_func(format_spec, default)
if not offset or offset == "local":
def off(dt):
local = time.localtime(util.datetime_to_timestamp(dt))
return fmt(dt + datetime.timedelta(0, local.tm_gmtoff))
def off(dt_utc):
local = time.localtime(dt.to_ts(dt_utc))
return fmt(dt_utc + datetime.timedelta(0, local.tm_gmtoff))
else:
hours, _, minutes = offset.partition(":")
offset = 3600 * int(hours)
@@ -569,9 +569,9 @@ _CONVERSIONS = {
"t": str.strip,
"n": len,
"L": util.code_to_language,
"T": util.datetime_to_timestamp_string,
"d": text.parse_timestamp,
"D": util.to_datetime,
"T": dt.to_ts_string,
"d": dt.parse_ts,
"D": dt.convert,
"U": text.unescape,
"H": lambda s: text.unescape(text.remove_html(s)),
"g": text.slugify,

View File

@@ -9,8 +9,7 @@
"""Use metadata as file modification time"""
from .common import PostProcessor
from .. import text, util, formatter
from datetime import datetime
from .. import text, util, dt, formatter
class MtimePP(PostProcessor):
@@ -36,8 +35,8 @@ class MtimePP(PostProcessor):
return
pathfmt.kwdict["_mtime_meta"] = (
util.datetime_to_timestamp(mtime)
if isinstance(mtime, datetime) else
dt.to_ts(mtime)
if isinstance(mtime, dt.datetime) else
text.parse_int(mtime)
)

View File

@@ -16,7 +16,6 @@ import random
import getpass
import hashlib
import binascii
import datetime
import functools
import itertools
import subprocess
@@ -24,7 +23,7 @@ import collections
import urllib.parse
from http.cookiejar import Cookie
from email.utils import mktime_tz, parsedate_tz
from . import text, version, exception
from . import text, dt, version, exception
def bencode(num, alphabet="0123456789"):
@@ -228,63 +227,6 @@ def to_string(value):
return str(value)
def to_datetime(value):
"""Convert 'value' to a datetime object"""
if not value:
return EPOCH
if isinstance(value, datetime.datetime):
return value
if isinstance(value, str):
try:
if value[-1] == "Z":
# compat for Python < 3.11
value = value[:-1]
dt = datetime.datetime.fromisoformat(value)
if dt.tzinfo is None:
if dt.microsecond:
dt = dt.replace(microsecond=0)
else:
# convert to naive UTC
dt = dt.astimezone(datetime.timezone.utc).replace(
microsecond=0, tzinfo=None)
return dt
except Exception:
pass
return text.parse_timestamp(value, EPOCH)
def datetime_to_timestamp(dt):
"""Convert naive UTC datetime to Unix timestamp"""
return (dt - EPOCH) / SECOND
def datetime_to_timestamp_string(dt):
"""Convert naive UTC datetime to Unix timestamp string"""
try:
return str((dt - EPOCH) // SECOND)
except Exception:
return ""
if sys.hexversion < 0x30c0000:
# Python <= 3.11
datetime_utcfromtimestamp = datetime.datetime.utcfromtimestamp
datetime_utcnow = datetime.datetime.utcnow
datetime_from_timestamp = datetime_utcfromtimestamp
else:
# Python >= 3.12
def datetime_from_timestamp(ts=None):
"""Convert Unix timestamp to naive UTC datetime"""
Y, m, d, H, M, S, _, _, _ = time.gmtime(ts)
return datetime.datetime(Y, m, d, H, M, S)
datetime_utcfromtimestamp = datetime_from_timestamp
datetime_utcnow = datetime_from_timestamp
def json_default(obj):
if isinstance(obj, CustomNone):
return None
@@ -379,7 +321,7 @@ def extract_headers(response):
text.nameext_from_url(name, data)
if hlm := headers.get("last-modified"):
data["date"] = datetime.datetime(*parsedate_tz(hlm)[:6])
data["date"] = dt.datetime(*parsedate_tz(hlm)[:6])
return data
@@ -751,11 +693,11 @@ class Flags():
# 735506 == 739342 - 137 * 28
# v135.0 release of Chrome on 2025-04-01 has ordinal 739342
# 735562 == 739342 - 135 * 28
# _ord_today = datetime.date.today().toordinal()
# _ord_today = dt.date.today().toordinal()
# _ff_ver = (_ord_today - 735506) // 28
# _ch_ver = (_ord_today - 735562) // 28
_ff_ver = (datetime.date.today().toordinal() - 735506) // 28
_ff_ver = (dt.date.today().toordinal() - 735506) // 28
# _ch_ver = _ff_ver - 2
re = text.re
@@ -763,8 +705,6 @@ re_compile = text.re_compile
NONE = CustomNone()
FLAGS = Flags()
EPOCH = datetime.datetime(1970, 1, 1)
SECOND = datetime.timedelta(0, 1)
WINDOWS = (os.name == "nt")
SENTINEL = object()
EXECUTABLE = getattr(sys, "frozen", False)
@@ -786,8 +726,8 @@ GLOBALS = {
"contains" : contains,
"parse_int": text.parse_int,
"urlsplit" : urllib.parse.urlsplit,
"datetime" : datetime.datetime,
"timedelta": datetime.timedelta,
"datetime" : dt.datetime,
"timedelta": dt.timedelta,
"abort" : raises(exception.StopExtraction),
"error" : raises(exception.AbortExtraction),
"terminate": raises(exception.TerminateExtraction),

View File

@@ -668,6 +668,8 @@ __tests__ = (
"#class" : pixiv.PixivSketchExtractor,
"#pattern" : r"https://img\-sketch\.pixiv\.net/uploads/medium/file/\d+/\d+\.(jpg|png)",
"#count" : ">= 35",
"date": "type:datetime",
},
)

View File

@@ -14,10 +14,9 @@ from unittest.mock import patch
import time
import string
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from gallery_dl import extractor, util # noqa E402
from gallery_dl import extractor, util, dt # noqa E402
from gallery_dl.extractor import mastodon # noqa E402
from gallery_dl.extractor.common import Extractor, Message # noqa E402
from gallery_dl.extractor.directlink import DirectlinkExtractor # noqa E402
@@ -233,8 +232,8 @@ class TestExtractorWait(unittest.TestCase):
def test_wait_until_datetime(self):
extr = extractor.find("generic:https://example.org/")
until = util.datetime_utcnow() + timedelta(seconds=5)
until_local = datetime.now() + timedelta(seconds=5)
until = dt.now() + dt.timedelta(seconds=5)
until_local = dt.datetime.now() + dt.timedelta(seconds=5)
if not until.microsecond:
until = until.replace(microsecond=until_local.microsecond)
@@ -251,8 +250,8 @@ class TestExtractorWait(unittest.TestCase):
self._assert_isotime(calls[0][1][1], until_local)
def _assert_isotime(self, output, until):
if not isinstance(until, datetime):
until = datetime.fromtimestamp(until)
if not isinstance(until, dt.datetime):
until = dt.datetime.fromtimestamp(until)
o = self._isotime_to_seconds(output)
u = self._isotime_to_seconds(until.time().isoformat()[:8])
self.assertLessEqual(o-u, 1.0)

View File

@@ -271,8 +271,8 @@ class TestFormatter(unittest.TestCase):
def test_specifier_datetime(self):
self._run_test("{ds:D%Y-%m-%dT%H:%M:%S%z}", "2010-01-01 00:00:00")
self._run_test("{ds:D%Y}", "2010-01-01T01:00:00+01:00")
self._run_test("{l:D%Y}", "None")
self._run_test("{ds:D%Y}", "1970-01-01 00:00:00")
self._run_test("{l:D%Y}", "1970-01-01 00:00:00")
def test_specifier_offset(self):
self._run_test("{dt:O 01:00}", "2010-01-01 01:00:00")